This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new df2a963e742 [Refactor](block) pick some trim block pr #57737 #57860
#58124 (#58209)
df2a963e742 is described below
commit df2a963e742cc0ef7a088c96fdbe53e55e68527d
Author: Pxl <[email protected]>
AuthorDate: Fri Nov 21 16:53:59 2025 +0800
[Refactor](block) pick some trim block pr #57737 #57860 #58124 (#58209)
pick from #57737 #57860 #58124
---------
Co-authored-by: HappenLee <[email protected]>
---
be/src/common/consts.h | 2 -
be/src/exec/rowid_fetcher.cpp | 16 +-
be/src/olap/base_tablet.cpp | 7 +-
be/src/olap/partial_update_info.cpp | 28 +--
be/src/olap/push_handler.cpp | 4 +-
be/src/olap/tablet_schema.h | 1 -
be/src/pipeline/exec/materialization_opertor.cpp | 5 +-
be/src/pipeline/exec/scan_operator.cpp | 7 -
be/src/pipeline/exec/schema_scan_operator.cpp | 4 +-
be/src/pipeline/exec/schema_scan_operator.h | 5 +
be/src/runtime/tablets_channel.cpp | 4 +-
.../arrow_flight/arrow_flight_batch_reader.cpp | 5 +-
.../aggregate_functions/aggregate_function_sort.h | 8 +-
be/src/vec/core/block.cpp | 211 ++-------------------
be/src/vec/core/block.h | 70 ++-----
be/src/vec/core/sort_block.cpp | 9 +-
be/src/vec/core/sort_description.h | 11 +-
.../vec/exec/format/arrow/arrow_stream_reader.cpp | 9 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 85 ++++++---
.../exec/format/parquet/vparquet_group_reader.cpp | 95 +++++++---
be/src/vec/exec/format/table/equality_delete.cpp | 42 ++--
be/src/vec/exec/format/table/iceberg_reader.cpp | 22 ++-
be/src/vec/exec/format/wal/wal_reader.cpp | 4 +-
be/src/vec/exec/jni_connector.cpp | 4 +-
be/src/vec/exec/scan/file_scanner.cpp | 21 +-
be/src/vec/exec/scan/olap_scanner.cpp | 16 +-
be/src/vec/exec/scan/scanner.cpp | 7 -
be/src/vec/functions/function_helpers.cpp | 8 -
be/src/vec/olap/block_reader.cpp | 1 -
be/src/vec/olap/vcollect_iterator.cpp | 15 +-
be/src/vec/olap/vertical_block_reader.cpp | 1 -
be/src/vec/runtime/vdata_stream_recvr.cpp | 4 +-
be/src/vec/runtime/vdata_stream_recvr.h | 7 +-
be/src/vec/sink/varrow_flight_result_writer.cpp | 8 +-
be/src/vec/sink/vdata_stream_sender.cpp | 5 +-
be/src/vec/sink/vtablet_block_convertor.cpp | 4 -
be/src/vec/sink/writer/vtablet_writer.cpp | 3 +-
be/src/vec/sink/writer/vwal_writer.cpp | 3 +-
be/src/vec/spill/spill_reader.cpp | 4 +-
be/src/vec/spill/spill_writer.cpp | 3 +-
be/test/olap/wal/wal_reader_writer_test.cpp | 10 +-
be/test/pipeline/exec/vdata_stream_recvr_test.cpp | 3 +-
.../operator/materialization_shared_state_test.cpp | 18 +-
be/test/testutil/mock/mock_data_stream_sender.h | 10 +-
be/test/vec/core/block_test.cpp | 156 +++++++--------
be/test/vec/data_types/common_data_type_test.h | 7 +-
.../vec/exec/format/parquet/parquet_read_lines.cpp | 6 +-
be/test/vec/exec/orc/orc_read_lines.cpp | 6 +-
48 files changed, 443 insertions(+), 541 deletions(-)
diff --git a/be/src/common/consts.h b/be/src/common/consts.h
index 4618ffe7d74..ca4662c839e 100644
--- a/be/src/common/consts.h
+++ b/be/src/common/consts.h
@@ -24,8 +24,6 @@ namespace BeConsts {
const std::string CSV = "csv";
const std::string CSV_WITH_NAMES = "csv_with_names";
const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types";
-const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__";
-const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED =
"__TEMP__scanner_filtered";
const std::string ROWID_COL = "__DORIS_ROWID_COL__";
const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 0f4619e1303..4e09c9ac36c 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -182,7 +182,11 @@ Status RowIDFetcher::_merge_rpc_results(const
PMultiGetRequest& request,
}
// Merge partial blocks
vectorized::Block partial_block;
- RETURN_IF_ERROR(partial_block.deserialize(resp.block()));
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t uncompressed_time = 0;
+
+ RETURN_IF_ERROR(
+ partial_block.deserialize(resp.block(), &uncompressed_size,
&uncompressed_time));
if (partial_block.is_empty_column()) {
return Status::OK();
}
@@ -493,9 +497,10 @@ Status RowIdStorageReader::read_by_rowids(const
PMultiGetRequest& request,
<< ", be_exec_version:" << request.be_exec_version();
[[maybe_unused]] size_t compressed_size = 0;
[[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t compress_time = 0;
int be_exec_version = request.has_be_exec_version() ?
request.be_exec_version() : 0;
RETURN_IF_ERROR(result_block.serialize(be_exec_version,
response->mutable_block(),
- &uncompressed_size,
&compressed_size,
+ &uncompressed_size,
&compressed_size, &compress_time,
segment_v2::CompressionTypePB::LZ4));
}
@@ -600,10 +605,11 @@ Status RowIdStorageReader::read_by_rowids(const
PMultiGetRequestV2& request,
[[maybe_unused]] size_t compressed_size = 0;
[[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t compress_time = 0;
int be_exec_version = request.has_be_exec_version() ?
request.be_exec_version() : 0;
- RETURN_IF_ERROR(result_blocks[i].serialize(be_exec_version,
pblock->mutable_block(),
- &uncompressed_size,
&compressed_size,
-
segment_v2::CompressionTypePB::LZ4));
+ RETURN_IF_ERROR(result_blocks[i].serialize(
+ be_exec_version, pblock->mutable_block(),
&uncompressed_size, &compressed_size,
+ &compress_time, segment_v2::CompressionTypePB::LZ4));
}
// Build file type statistics string
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 9cde2bbdf45..2c858e557bb 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -941,11 +941,10 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr
input_rowset, uint32_t
const signed char* BaseTablet::get_delete_sign_column_data(const
vectorized::Block& block,
size_t
rows_at_least) {
- if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
- block.try_get_by_name(DELETE_SIGN);
- delete_sign_column != nullptr) {
+ if (int pos = block.get_position_by_name(DELETE_SIGN); pos != -1) {
+ const vectorized::ColumnWithTypeAndName& delete_sign_column =
block.get_by_position(pos);
const auto& delete_sign_col =
- reinterpret_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+ assert_cast<const
vectorized::ColumnInt8&>(*(delete_sign_column.column));
if (delete_sign_col.size() >= rows_at_least) {
return delete_sign_col.get_data().data();
}
diff --git a/be/src/olap/partial_update_info.cpp
b/be/src/olap/partial_update_info.cpp
index 5e6082c5c13..928ae80b38f 100644
--- a/be/src/olap/partial_update_info.cpp
+++ b/be/src/olap/partial_update_info.cpp
@@ -311,9 +311,7 @@ Status FixedReadPlan::read_columns_by_plan(
const signed char* __restrict cur_delete_signs) const {
if (force_read_old_delete_signs) {
// always read delete sign column from historical data
- if (const vectorized::ColumnWithTypeAndName* old_delete_sign_column =
- block.try_get_by_name(DELETE_SIGN);
- old_delete_sign_column == nullptr) {
+ if (block.get_position_by_name(DELETE_SIGN) == -1) {
auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
cids_to_read.emplace_back(del_col_cid);
block.swap(tablet_schema.create_block_by_cids(cids_to_read));
@@ -384,7 +382,10 @@ Status FixedReadPlan::fill_missing_columns(
old_value_block, &read_index, true,
nullptr));
const auto* old_delete_signs =
BaseTablet::get_delete_sign_column_data(old_value_block);
- DCHECK(old_delete_signs != nullptr);
+ if (old_delete_signs == nullptr) {
+ return Status::InternalError("old delete signs column not found,
block: {}",
+ old_value_block.dump_structure());
+ }
// build default value columns
auto default_value_block = old_value_block.clone_empty();
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
@@ -420,27 +421,30 @@ Status FixedReadPlan::fill_missing_columns(
}
if (should_use_default) {
- // clang-format off
if (tablet_column.has_default_value()) {
missing_col->insert_from(*mutable_default_value_columns[i], 0);
} else if (tablet_column.is_nullable()) {
- auto* nullable_column =
assert_cast<vectorized::ColumnNullable*,
TypeCheckOnRelease::DISABLE>(missing_col.get());
+ auto* nullable_column =
+
assert_cast<vectorized::ColumnNullable*>(missing_col.get());
nullable_column->insert_many_defaults(1);
} else if (tablet_schema.auto_increment_column() ==
tablet_column.name()) {
- const auto& column =
*DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name()));
+ const auto& column =
+
*DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name()));
DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
auto* auto_inc_column =
- assert_cast<vectorized::ColumnInt64*,
TypeCheckOnRelease::DISABLE>(missing_col.get());
-
auto_inc_column->insert(vectorized::Field::create_field<TYPE_BIGINT>(
-assert_cast<const vectorized::ColumnInt64*, TypeCheckOnRelease::DISABLE>(
-block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL).column.get())->get_element(idx)));
+
assert_cast<vectorized::ColumnInt64*>(missing_col.get());
+ int pos =
block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL);
+ if (pos == -1) {
+ return Status::InternalError("auto increment column
not found in block {}",
+ block->dump_structure());
+ }
+
auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx);
} else {
// If the control flow reaches this branch, the column
neither has default value
// nor is nullable. It means that the row's delete sign is
marked, and the value
// columns are useless and won't be read. So we can just
put arbitary values in the cells
missing_col->insert(tablet_column.get_vec_type()->get_default());
}
- // clang-format on
} else {
missing_col->insert_from(*old_value_block.get_by_position(i).column,
pos_in_old_block);
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 07586acc5a2..e48585b1aac 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -478,10 +478,10 @@ Status PushBrokerReader::_cast_to_input_block() {
if (slot_desc->type()->get_primitive_type() ==
PrimitiveType::TYPE_VARIANT) {
continue;
}
- auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
idx = _src_block_name_to_idx[slot_desc->col_name()];
+ auto& arg = _src_block_ptr->get_by_position(idx);
// bitmap convert:src -> to_base64 -> bitmap_from_base64
if (slot_desc->type()->get_primitive_type() == TYPE_BITMAP) {
auto base64_return_type =
vectorized::DataTypeFactory::instance().create_data_type(
@@ -491,7 +491,7 @@ Status PushBrokerReader::_cast_to_input_block() {
RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr,
{idx}, idx,
arg.column->size()));
_src_block_ptr->get_by_position(idx).type =
std::move(base64_return_type);
- auto& arg_base64 =
_src_block_ptr->get_by_name(slot_desc->col_name());
+ auto& arg_base64 = _src_block_ptr->get_by_position(idx);
auto func_bitmap_from_base64 =
vectorized::SimpleFunctionFactory::instance().get_function(
"bitmap_from_base64", {arg_base64}, return_type);
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 57463316cdf..b839231464b 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -463,7 +463,6 @@ public:
void set_skip_write_index_on_load(bool skip) { _skip_write_index_on_load =
skip; }
bool skip_write_index_on_load() const { return _skip_write_index_on_load; }
int32_t delete_sign_idx() const { return _delete_sign_idx; }
- void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx =
delete_sign_idx; }
bool has_sequence_col() const { return _sequence_col_idx != -1; }
int32_t sequence_col_idx() const { return _sequence_col_idx; }
void set_version_col_idx(int32_t version_col_idx) { _version_col_idx =
version_col_idx; }
diff --git a/be/src/pipeline/exec/materialization_opertor.cpp
b/be/src/pipeline/exec/materialization_opertor.cpp
index 63fcd7b47a6..689d345dd0a 100644
--- a/be/src/pipeline/exec/materialization_opertor.cpp
+++ b/be/src/pipeline/exec/materialization_opertor.cpp
@@ -59,8 +59,11 @@ Status MaterializationSharedState::merge_multi_response() {
for (int i = 0; i < block_order_results.size(); ++i) {
for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
vectorized::Block partial_block;
+ size_t uncompressed_size = 0;
+ int64_t uncompressed_time = 0;
DCHECK(rpc_struct.response.blocks_size() > i);
-
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block()));
+
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
+ &uncompressed_size,
&uncompressed_time));
if (rpc_struct.response.blocks(i).has_profile()) {
auto response_profile =
RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 60536cbbbc9..61e42c81503 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1318,13 +1318,6 @@ Status
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- // in inverted index apply logic, in order to optimize query performance,
- // we built some temporary columns into block, these columns only used in
scan node level,
- // remove them when query leave scan node to avoid other nodes use
block->columns() to make a wrong decision
- Defer drop_block_temp_column {[&]() {
- std::unique_lock l(local_state._block_lock);
- block->erase_tmp_columns();
- }};
if (state->is_cancelled()) {
if (local_state._scanner_ctx) {
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp
b/be/src/pipeline/exec/schema_scan_operator.cpp
index e57fbd75573..79987c001de 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -179,6 +179,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
int j = 0;
for (; j < columns_desc.size(); ++j) {
if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(),
columns_desc[j].name)) {
+ _slot_offsets[i] = j;
break;
}
}
@@ -250,11 +251,10 @@ Status SchemaScanOperatorX::get_block(RuntimeState*
state, vectorized::Block* bl
if (src_block.rows()) {
// block->check_number_of_rows();
for (int i = 0; i < _slot_num; ++i) {
- auto* dest_slot_desc = _dest_tuple_desc->slots()[i];
vectorized::MutableColumnPtr column_ptr =
std::move(*block->get_by_position(i).column).mutate();
column_ptr->insert_range_from(
-
*src_block.get_by_name(dest_slot_desc->col_name()).column, 0,
+
*src_block.safe_get_by_position(_slot_offsets[i]).column, 0,
src_block.rows());
}
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block,
diff --git a/be/src/pipeline/exec/schema_scan_operator.h
b/be/src/pipeline/exec/schema_scan_operator.h
index 6846d9b59af..0634b9d5693 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -19,6 +19,8 @@
#include <stdint.h>
+#include <unordered_map>
+
#include "common/status.h"
#include "exec/schema_scanner.h"
#include "operator.h"
@@ -85,6 +87,9 @@ private:
int _tuple_idx;
// slot num need to fill in and return
int _slot_num;
+
+ // slot index mapping to src column index
+ std::unordered_map<int, int> _slot_offsets;
};
#include "common/compile_check_end.h"
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index acab5e12935..460879978cd 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -569,7 +569,9 @@ Status BaseTabletsChannel::_write_block_data(
std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
PTabletWriterAddBlockResult* response) {
vectorized::Block send_data;
- RETURN_IF_ERROR(send_data.deserialize(request.block()));
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size,
&uncompressed_time));
CHECK(send_data.rows() == request.tablet_ids_size())
<< "block rows: " << send_data.rows()
<< ", tablet_ids_size: " << request.tablet_ids_size();
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
index 07e46cfcfed..24fc977d027 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
@@ -268,7 +268,10 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() {
{
SCOPED_ATOMIC_TIMER(&_deserialize_block_timer);
_block = vectorized::Block::create_shared();
- st = _block->deserialize(callback->response_->block());
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t uncompressed_time = 0;
+ st = _block->deserialize(callback->response_->block(),
&uncompressed_size,
+ &uncompressed_time);
ARROW_RETURN_NOT_OK(to_arrow_status(st));
break;
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index 79fdf4979b1..3d8bc2480c2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -72,8 +72,10 @@ struct AggregateFunctionSortData {
PBlock pblock;
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
+ int64_t compressed_time = 0;
auto st = block.serialize(state->be_exec_version(), &pblock,
&uncompressed_bytes,
- &compressed_bytes,
segment_v2::CompressionTypePB::NO_COMPRESSION);
+ &compressed_bytes, &compressed_time,
+
segment_v2::CompressionTypePB::NO_COMPRESSION);
if (!st.ok()) {
throw doris::Exception(st);
}
@@ -87,7 +89,9 @@ struct AggregateFunctionSortData {
PBlock pblock;
pblock.ParseFromString(data);
- auto st = block.deserialize(pblock);
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ [[maybe_unused]] int64_t uncompressed_time = 0;
+ auto st = block.deserialize(pblock, &uncompressed_size,
&uncompressed_time);
// If memory allocate failed during deserialize, st is not ok, throw
exception here to
// stop the query.
if (!st.ok()) {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index c09a7188d3e..dbaf8ee023b 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -82,13 +82,9 @@ template void
clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&,
template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&,
RuntimeProfile::Counter*
memory_used_counter);
-Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {
- initialize_index_by_name();
-}
+Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {}
-Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {
- initialize_index_by_name();
-}
+Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {}
Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size,
bool ignore_trivial_slot) {
@@ -112,7 +108,8 @@ Block::Block(const std::vector<SlotDescriptor>& slots,
size_t block_size,
*this = Block(slot_ptrs, block_size, ignore_trivial_slot);
}
-Status Block::deserialize(const PBlock& pblock) {
+Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes,
+ int64_t* decompress_time) {
swap(Block());
int be_exec_version = pblock.has_be_exec_version() ?
pblock.be_exec_version() : 0;
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
@@ -121,7 +118,7 @@ Status Block::deserialize(const PBlock& pblock) {
std::string compression_scratch;
if (pblock.compressed()) {
// Decompress
- SCOPED_RAW_TIMER(&_decompress_time_ns);
+ SCOPED_RAW_TIMER(decompress_time);
const char* compressed_data = pblock.column_values().c_str();
size_t compressed_size = pblock.column_values().size();
size_t uncompressed_size = 0;
@@ -144,7 +141,7 @@ Status Block::deserialize(const PBlock& pblock) {
compression_scratch.data());
DCHECK(success) << "snappy::RawUncompress failed";
}
- _decompressed_bytes = uncompressed_size;
+ *uncompressed_bytes = uncompressed_size;
buf = compression_scratch.data();
} else {
buf = pblock.column_values().data();
@@ -158,22 +155,14 @@ Status Block::deserialize(const PBlock& pblock) {
buf = type->deserialize(buf, &data_column,
pblock.be_exec_version()));
data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
}
- initialize_index_by_name();
return Status::OK();
}
void Block::reserve(size_t count) {
- index_by_name.reserve(count);
data.reserve(count);
}
-void Block::initialize_index_by_name() {
- for (size_t i = 0, size = data.size(); i < size; ++i) {
- index_by_name[data[i].name] = i;
- }
-}
-
void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
if (position > data.size()) {
throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -181,13 +170,6 @@ void Block::insert(size_t position, const
ColumnWithTypeAndName& elem) {
data.size(), dump_names());
}
- for (auto& name_pos : index_by_name) {
- if (name_pos.second >= position) {
- ++name_pos.second;
- }
- }
-
- index_by_name.emplace(elem.name, position);
data.emplace(data.begin() + position, elem);
}
@@ -198,30 +180,20 @@ void Block::insert(size_t position,
ColumnWithTypeAndName&& elem) {
data.size(), dump_names());
}
- for (auto& name_pos : index_by_name) {
- if (name_pos.second >= position) {
- ++name_pos.second;
- }
- }
-
- index_by_name.emplace(elem.name, position);
data.emplace(data.begin() + position, std::move(elem));
}
void Block::clear_names() {
- index_by_name.clear();
for (auto& entry : data) {
entry.name.clear();
}
}
void Block::insert(const ColumnWithTypeAndName& elem) {
- index_by_name.emplace(elem.name, data.size());
data.emplace_back(elem);
}
void Block::insert(ColumnWithTypeAndName&& elem) {
- index_by_name.emplace(elem.name, data.size());
data.emplace_back(std::move(elem));
}
@@ -235,13 +207,6 @@ void Block::erase_tail(size_t start) {
DCHECK(start <= data.size()) << fmt::format(
"Position out of bound in Block::erase(), max position = {}",
data.size());
data.erase(data.begin() + start, data.end());
- for (auto it = index_by_name.begin(); it != index_by_name.end();) {
- if (it->second >= start) {
- index_by_name.erase(it++);
- } else {
- ++it;
- }
- }
}
void Block::erase(size_t position) {
@@ -253,36 +218,7 @@ void Block::erase(size_t position) {
}
void Block::erase_impl(size_t position) {
- bool need_maintain_index_by_name = true;
- if (position + 1 == data.size()) {
- index_by_name.erase(data.back().name);
- need_maintain_index_by_name = false;
- }
-
data.erase(data.begin() + position);
-
- if (need_maintain_index_by_name) {
- for (auto it = index_by_name.begin(); it != index_by_name.end();) {
- if (it->second == position) {
- index_by_name.erase(it++);
- } else {
- if (it->second > position) {
- --it->second;
- }
- ++it;
- }
- }
- }
-}
-
-void Block::erase(const String& name) {
- auto index_it = index_by_name.find(name);
- if (index_it == index_by_name.end()) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block,
name={}, block_names={}",
- name, dump_names());
- }
-
- erase_impl(index_it->second);
}
ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) {
@@ -303,54 +239,13 @@ const ColumnWithTypeAndName&
Block::safe_get_by_position(size_t position) const
return data[position];
}
-ColumnWithTypeAndName& Block::get_by_name(const std::string& name) {
- auto it = index_by_name.find(name);
- if (index_by_name.end() == it) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block,
name={}, block_names={}",
- name, dump_names());
- }
-
- return data[it->second];
-}
-
-const ColumnWithTypeAndName& Block::get_by_name(const std::string& name) const
{
- auto it = index_by_name.find(name);
- if (index_by_name.end() == it) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block,
name={}, block_names={}",
- name, dump_names());
- }
-
- return data[it->second];
-}
-
-ColumnWithTypeAndName* Block::try_get_by_name(const std::string& name) {
- auto it = index_by_name.find(name);
- if (index_by_name.end() == it) {
- return nullptr;
- }
- return &data[it->second];
-}
-
-const ColumnWithTypeAndName* Block::try_get_by_name(const std::string& name)
const {
- auto it = index_by_name.find(name);
- if (index_by_name.end() == it) {
- return nullptr;
- }
- return &data[it->second];
-}
-
-bool Block::has(const std::string& name) const {
- return index_by_name.end() != index_by_name.find(name);
-}
-
-size_t Block::get_position_by_name(const std::string& name) const {
- auto it = index_by_name.find(name);
- if (index_by_name.end() == it) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block,
name={}, block_names={}",
- name, dump_names());
+int Block::get_position_by_name(const std::string& name) const {
+ for (int i = 0; i < data.size(); i++) {
+ if (data[i].name == name) {
+ return i;
+ }
}
-
- return it->second;
+ return -1;
}
void Block::check_number_of_rows(bool allow_null_columns) const {
@@ -768,7 +663,6 @@ DataTypes Block::get_data_types() const {
void Block::clear() {
data.clear();
- index_by_name.clear();
}
void Block::clear_column_data(int64_t column_size) noexcept {
@@ -789,15 +683,6 @@ void Block::clear_column_data(int64_t column_size)
noexcept {
}
}
-void Block::erase_tmp_columns() noexcept {
- auto all_column_names = get_names();
- for (auto& name : all_column_names) {
- if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
- erase(name);
- }
- }
-}
-
void Block::clear_column_mem_not_keep(const std::vector<bool>&
column_keep_flags,
bool need_keep_first) {
if (data.size() >= column_keep_flags.size()) {
@@ -819,17 +704,14 @@ void Block::clear_column_mem_not_keep(const
std::vector<bool>& column_keep_flags
void Block::swap(Block& other) noexcept {
SCOPED_SKIP_MEMORY_CHECK();
data.swap(other.data);
- index_by_name.swap(other.index_by_name);
}
void Block::swap(Block&& other) noexcept {
SCOPED_SKIP_MEMORY_CHECK();
data = std::move(other.data);
- index_by_name = std::move(other.index_by_name);
}
void Block::shuffle_columns(const std::vector<int>& result_column_ids) {
- index_by_name.clear();
Container tmp_data;
tmp_data.reserve(result_column_ids.size());
for (const int result_column_id : result_column_ids) {
@@ -958,7 +840,8 @@ Status Block::filter_block(Block* block, size_t
filter_column_id, size_t column_
Status Block::serialize(int be_exec_version, PBlock* pblock,
/*std::string* compressed_buffer,*/ size_t*
uncompressed_bytes,
- size_t* compressed_bytes,
segment_v2::CompressionTypePB compression_type,
+ size_t* compressed_bytes, int64_t* compress_time,
+ segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data) const {
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
pblock->set_be_exec_version(be_exec_version);
@@ -998,7 +881,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
// compress
if (compression_type != segment_v2::NO_COMPRESSION &&
content_uncompressed_size > 0) {
- SCOPED_RAW_TIMER(&_compress_time_ns);
+ SCOPED_RAW_TIMER(compress_time);
pblock->set_compression_type(compression_type);
pblock->set_uncompressed_size(serialize_bytes);
@@ -1030,24 +913,6 @@ Status Block::serialize(int be_exec_version, PBlock*
pblock,
return Status::OK();
}
-MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs,
int reserve_size,
- bool ignore_trivial_slot) {
- for (auto* const tuple_desc : tuple_descs) {
- for (auto* const slot_desc : tuple_desc->slots()) {
- if (ignore_trivial_slot && !slot_desc->is_materialized()) {
- continue;
- }
- _data_types.emplace_back(slot_desc->get_data_type_ptr());
- _columns.emplace_back(_data_types.back()->create_column());
- if (reserve_size != 0) {
- _columns.back()->reserve(reserve_size);
- }
- _names.push_back(slot_desc->col_name());
- }
- }
- initialize_index_by_name();
-}
-
size_t MutableBlock::rows() const {
for (const auto& column : _columns) {
if (column) {
@@ -1063,7 +928,6 @@ void MutableBlock::swap(MutableBlock& another) noexcept {
_columns.swap(another._columns);
_data_types.swap(another._data_types);
_names.swap(another._names);
- index_by_name.swap(another.index_by_name);
}
void MutableBlock::add_row(const Block* block, int row) {
@@ -1126,31 +990,6 @@ Status MutableBlock::add_rows(const Block* block, const
std::vector<int64_t>& ro
return Status::OK();
}
-void MutableBlock::erase(const String& name) {
- auto index_it = index_by_name.find(name);
- if (index_it == index_by_name.end()) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block,
name={}, block_names={}",
- name, dump_names());
- }
-
- auto position = index_it->second;
-
- _columns.erase(_columns.begin() + position);
- _data_types.erase(_data_types.begin() + position);
- _names.erase(_names.begin() + position);
-
- for (auto it = index_by_name.begin(); it != index_by_name.end();) {
- if (it->second == position) {
- index_by_name.erase(it++);
- } else {
- if (it->second > position) {
- --it->second;
- }
- ++it;
- }
- }
-}
-
Block MutableBlock::to_block(int start_column) {
return to_block(start_column, (int)_columns.size());
}
@@ -1290,26 +1129,6 @@ void MutableBlock::clear_column_data() noexcept {
}
}
-void MutableBlock::initialize_index_by_name() {
- for (size_t i = 0, size = _names.size(); i < size; ++i) {
- index_by_name[_names[i]] = i;
- }
-}
-
-bool MutableBlock::has(const std::string& name) const {
- return index_by_name.end() != index_by_name.find(name);
-}
-
-size_t MutableBlock::get_position_by_name(const std::string& name) const {
- auto it = index_by_name.find(name);
- if (index_by_name.end() == it) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block,
name={}, block_names={}",
- name, dump_names());
- }
-
- return it->second;
-}
-
std::string MutableBlock::dump_names() const {
std::string out;
for (auto it = _names.begin(); it != _names.end(); ++it) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index b3ecb58ebed..b99f2ec09ad 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -73,14 +73,7 @@ class Block {
private:
using Container = ColumnsWithTypeAndName;
- using IndexByName = phmap::flat_hash_map<String, size_t>;
Container data;
- IndexByName index_by_name;
-
- int64_t _decompress_time_ns = 0;
- int64_t _decompressed_bytes = 0;
-
- mutable int64_t _compress_time_ns = 0;
public:
Block() = default;
@@ -113,8 +106,6 @@ public:
void erase_tail(size_t start);
/// remove the columns at the specified positions
void erase(const std::set<size_t>& positions);
- /// remove the column with the specified name
- void erase(const String& name);
// T was std::set<int>, std::vector<int>, std::list<int>
template <class T>
void erase_not_in(const T& container) {
@@ -125,7 +116,13 @@ public:
std::swap(data, new_data);
}
- void initialize_index_by_name();
+ std::unordered_map<std::string, uint32_t> get_name_to_pos_map() const {
+ std::unordered_map<std::string, uint32_t> name_to_index_map;
+ for (uint32_t i = 0; i < data.size(); ++i) {
+ name_to_index_map[data[i].name] = i;
+ }
+ return name_to_index_map;
+ }
/// References are invalidated after calling functions above.
ColumnWithTypeAndName& get_by_position(size_t position) {
@@ -151,13 +148,6 @@ public:
ColumnWithTypeAndName& safe_get_by_position(size_t position);
const ColumnWithTypeAndName& safe_get_by_position(size_t position) const;
- ColumnWithTypeAndName& get_by_name(const std::string& name);
- const ColumnWithTypeAndName& get_by_name(const std::string& name) const;
-
- // return nullptr when no such column name
- ColumnWithTypeAndName* try_get_by_name(const std::string& name);
- const ColumnWithTypeAndName* try_get_by_name(const std::string& name)
const;
-
Container::iterator begin() { return data.begin(); }
Container::iterator end() { return data.end(); }
Container::const_iterator begin() const { return data.begin(); }
@@ -165,9 +155,9 @@ public:
Container::const_iterator cbegin() const { return data.cbegin(); }
Container::const_iterator cend() const { return data.cend(); }
- bool has(const std::string& name) const;
-
- size_t get_position_by_name(const std::string& name) const;
+ // Get position of column by name. Returns -1 if there is no column with
that name.
+ // ATTN: this method is O(N). better maintain name -> position map in
caller if you need to call it frequently.
+ int get_position_by_name(const std::string& name) const;
const ColumnsWithTypeAndName& get_columns_with_type_and_name() const;
@@ -291,10 +281,11 @@ public:
// serialize block to PBlock
Status serialize(int be_exec_version, PBlock* pblock, size_t*
uncompressed_bytes,
- size_t* compressed_bytes, segment_v2::CompressionTypePB
compression_type,
+ size_t* compressed_bytes, int64_t* compress_time,
+ segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data = false) const;
- Status deserialize(const PBlock& pblock);
+ Status deserialize(const PBlock& pblock, size_t* uncompressed_bytes,
int64_t* decompress_time);
std::unique_ptr<Block> create_same_struct_block(size_t size, bool
is_reserve = false) const;
@@ -362,15 +353,6 @@ public:
// for String type or Array<String> type
void shrink_char_type_column_suffix_zero(const std::vector<size_t>&
char_type_idx);
- int64_t get_decompress_time() const { return _decompress_time_ns; }
- int64_t get_decompressed_bytes() const { return _decompressed_bytes; }
- int64_t get_compress_time() const { return _compress_time_ns; }
-
- // remove tmp columns in block
- // in inverted index apply logic, in order to optimize query performance,
- // we built some temporary columns into block
- void erase_tmp_columns() noexcept;
-
void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first);
@@ -391,9 +373,6 @@ private:
DataTypes _data_types;
std::vector<std::string> _names;
- using IndexByName = phmap::flat_hash_map<String, size_t>;
- IndexByName index_by_name;
-
public:
static MutableBlock build_mutable_block(Block* block) {
return block == nullptr ? MutableBlock() : MutableBlock(block);
@@ -401,27 +380,19 @@ public:
MutableBlock() = default;
~MutableBlock() = default;
- MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int
reserve_size = 0,
- bool igore_trivial_slot = false);
-
MutableBlock(Block* block)
: _columns(block->mutate_columns()),
_data_types(block->get_data_types()),
- _names(block->get_names()) {
- initialize_index_by_name();
- }
+ _names(block->get_names()) {}
MutableBlock(Block&& block)
: _columns(block.mutate_columns()),
_data_types(block.get_data_types()),
- _names(block.get_names()) {
- initialize_index_by_name();
- }
+ _names(block.get_names()) {}
void operator=(MutableBlock&& m_block) {
_columns = std::move(m_block._columns);
_data_types = std::move(m_block._data_types);
_names = std::move(m_block._names);
- initialize_index_by_name();
}
size_t rows() const;
@@ -552,7 +523,6 @@ public:
_columns[i] = _data_types[i]->create_column();
}
}
- initialize_index_by_name();
} else {
if (_columns.size() != block.columns()) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
@@ -599,9 +569,6 @@ public:
Status add_rows(const Block* block, size_t row_begin, size_t length);
Status add_rows(const Block* block, const std::vector<int64_t>& rows);
- /// remove the column with the specified name
- void erase(const String& name);
-
std::string dump_data(size_t row_limit = 100) const;
std::string dump_data_json(size_t row_limit = 100) const;
@@ -627,15 +594,8 @@ public:
std::vector<std::string>& get_names() { return _names; }
- bool has(const std::string& name) const;
-
- size_t get_position_by_name(const std::string& name) const;
-
/** Get a list of column names separated by commas. */
std::string dump_names() const;
-
-private:
- void initialize_index_by_name();
};
struct IteratorRowRef {
diff --git a/be/src/vec/core/sort_block.cpp b/be/src/vec/core/sort_block.cpp
index 20bf1f952af..75aa9d85a12 100644
--- a/be/src/vec/core/sort_block.cpp
+++ b/be/src/vec/core/sort_block.cpp
@@ -32,10 +32,7 @@ ColumnsWithSortDescriptions
get_columns_with_sort_description(const Block& block
for (size_t i = 0; i < size; ++i) {
const IColumn* column =
- !description[i].column_name.empty()
- ?
block.get_by_name(description[i].column_name).column.get()
- :
block.safe_get_by_position(description[i].column_number).column.get();
-
+
block.safe_get_by_position(description[i].column_number).column.get();
res.emplace_back(column, description[i]);
}
@@ -53,9 +50,7 @@ void sort_block(Block& src_block, Block& dest_block, const
SortDescription& desc
bool reverse = description[0].direction == -1;
const IColumn* column =
- !description[0].column_name.empty()
- ?
src_block.get_by_name(description[0].column_name).column.get()
- :
src_block.safe_get_by_position(description[0].column_number).column.get();
+
src_block.safe_get_by_position(description[0].column_number).column.get();
IColumn::Permutation perm;
column->get_permutation(reverse, limit,
description[0].nulls_direction, perm);
diff --git a/be/src/vec/core/sort_description.h
b/be/src/vec/core/sort_description.h
index cdee17f7651..4d1543d6904 100644
--- a/be/src/vec/core/sort_description.h
+++ b/be/src/vec/core/sort_description.h
@@ -20,20 +20,15 @@
#pragma once
-#include "cstddef"
-#include "memory"
-#include "string"
-#include "vec/core/field.h"
#include "vector"
namespace doris::vectorized {
/// Description of the sorting rule by one column.
struct SortColumnDescription {
- std::string column_name; /// The name of the column.
- int column_number; /// Column number (used if no name is given).
- int direction; /// 1 - ascending, -1 - descending.
- int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less.
+ int column_number; /// Column number (used if no name is given).
+ int direction; /// 1 - ascending, -1 - descending.
+ int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less.
SortColumnDescription(int column_number_, int direction_, int
nulls_direction_)
: column_number(column_number_),
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
index def1645f323..dfd4cdcfc91 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
@@ -103,12 +103,17 @@ Status ArrowStreamReader::get_next_block(Block* block,
size_t* read_rows, bool*
auto num_columns = batch.num_columns();
for (int c = 0; c < num_columns; ++c) {
arrow::Array* column = batch.column(c).get();
-
std::string column_name = batch.schema()->field(c)->name();
try {
const vectorized::ColumnWithTypeAndName& column_with_name =
- block->get_by_name(column_name);
+ block->safe_get_by_position(c);
+
+ if (column_with_name.name != column_name) {
+ return Status::InternalError("Column name mismatch:
expected {}, got {}",
+ column_with_name.name,
column_name);
+ }
+
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
column_with_name.column->assume_mutable_ref(), column,
0, num_rows, _ctzz));
} catch (Exception& e) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 7da93423dca..d72439e3c34 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -1277,9 +1277,10 @@ Status OrcReader::_fill_partition_columns(
const std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>&
partition_columns) {
DataTypeSerDe::FormatOptions _text_formatOptions;
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
for (const auto& kv : partition_columns) {
- auto doris_column = block->get_by_name(kv.first).column;
- auto* col_ptr = const_cast<IColumn*>(doris_column.get());
+ auto col_ptr =
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
const auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
@@ -1305,10 +1306,18 @@ Status OrcReader::_fill_partition_columns(
Status OrcReader::_fill_missing_columns(
Block* block, uint64_t rows,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) {
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
+ std::set<size_t> positions_to_erase;
for (const auto& kv : missing_columns) {
+ if (!name_to_pos_map.contains(kv.first)) {
+ return Status::InternalError("Failed to find missing column: {},
block: {}", kv.first,
+ block->dump_structure());
+ }
if (kv.second == nullptr) {
// no default column, fill with null
- auto mutable_column =
block->get_by_name(kv.first).column->assume_mutable();
+ auto mutable_column =
+
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
auto* nullable_column =
static_cast<vectorized::ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
@@ -1328,15 +1337,16 @@ Status OrcReader::_fill_missing_columns(
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a
normal column
result_column_ptr =
result_column_ptr->convert_to_full_column_if_const();
- auto origin_column_type = block->get_by_name(kv.first).type;
+ auto origin_column_type =
block->get_by_position(name_to_pos_map[kv.first]).type;
bool is_nullable = origin_column_type->is_nullable();
block->replace_by_position(
- block->get_position_by_name(kv.first),
+ name_to_pos_map[kv.first],
is_nullable ? make_nullable(result_column_ptr) :
result_column_ptr);
- block->erase(result_column_id);
+ positions_to_erase.insert(result_column_id);
}
}
}
+ block->erase(positions_to_erase);
return Status::OK();
}
@@ -2009,8 +2019,10 @@ Status OrcReader::_get_next_block_impl(Block* block,
size_t* read_rows, bool* eo
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
- auto& column_with_type_and_name = block->get_by_name(col_name);
+ auto& column_with_type_and_name =
block->get_by_position(name_to_pos_map[col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name =
_table_info_node_ptr->children_file_column_name(col_name);
@@ -2076,10 +2088,17 @@ Status OrcReader::_get_next_block_impl(Block* block,
size_t* read_rows, bool* eo
}
}
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
- size_t pos =
block->get_position_by_name(dict_filter_cols.first);
+ if (!name_to_pos_map.contains(dict_filter_cols.first)) {
+ return Status::InternalError(
+ "Failed to find dict filter column '{}' in block
{}",
+ dict_filter_cols.first, block->dump_structure());
+ }
+ auto pos = name_to_pos_map[dict_filter_cols.first];
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
@@ -2101,7 +2120,7 @@ Status OrcReader::_get_next_block_impl(Block* block,
size_t* read_rows, bool* eo
_fill_batch_vec(batch_vec, _batch.get(), 0);
for (auto& col_name : _lazy_read_ctx.all_read_columns) {
- auto& column_with_type_and_name = block->get_by_name(col_name);
+ auto& column_with_type_and_name =
block->get_by_position(name_to_pos_map[col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name =
_table_info_node_ptr->children_file_column_name(col_name);
@@ -2212,19 +2231,27 @@ void OrcReader::_build_delete_row_filter(const Block*
block, size_t rows) {
if (_delete_rows != nullptr) {
_delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
auto* __restrict _pos_delete_filter_data =
_delete_rows_filter_ptr->data();
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
const auto& original_transaction_column = assert_cast<const
ColumnInt64&>(*remove_nullable(
-
block->get_by_name(TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE).column));
- const auto& bucket_id_column = assert_cast<const ColumnInt32&>(
-
*remove_nullable(block->get_by_name(TransactionalHive::BUCKET_LOWER_CASE).column));
- const auto& row_id_column = assert_cast<const ColumnInt64&>(
-
*remove_nullable(block->get_by_name(TransactionalHive::ROW_ID_LOWER_CASE).column));
+ block->get_by_position(
+
name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
+ .column));
+ const auto& bucket_id_column = assert_cast<const
ColumnInt32&>(*remove_nullable(
+
block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE])
+ .column));
+ const auto& row_id_column = assert_cast<const
ColumnInt64&>(*remove_nullable(
+
block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE])
+ .column));
for (int i = 0; i < rows; ++i) {
auto original_transaction = original_transaction_column.get_int(i);
auto bucket_id = bucket_id_column.get_int(i);
auto row_id = row_id_column.get_int(i);
- TransactionalHiveReader::AcidRowID transactional_row_id =
{original_transaction,
-
bucket_id, row_id};
+ TransactionalHiveReader::AcidRowID transactional_row_id = {
+ .original_transaction = original_transaction,
+ .bucket = bucket_id,
+ .row_id = row_id};
if (_delete_rows->contains(transactional_row_id)) {
_pos_delete_filter_data[i] = 0;
}
@@ -2238,9 +2265,15 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data,
uint16_t* sel, uint16_t s
size_t origin_column_num = block->columns();
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& dict_filter_cols : _dict_filter_cols) {
+ if (!name_to_pos_map.contains(dict_filter_cols.first)) {
+ return Status::InternalError("Failed to find dict filter
column '{}' in block {}",
+ dict_filter_cols.first,
block->dump_structure());
+ }
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
- size_t pos = block->get_position_by_name(dict_filter_cols.first);
+ auto pos = name_to_pos_map[dict_filter_cols.first];
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
@@ -2266,8 +2299,10 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data,
uint16_t* sel, uint16_t s
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
}
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& table_col_name : table_col_names) {
- auto& column_with_type_and_name = block->get_by_name(table_col_name);
+ auto& column_with_type_and_name =
block->get_by_position(name_to_pos_map[table_col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name =
_table_info_node_ptr->children_file_column_name(table_col_name);
@@ -2319,13 +2354,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data,
uint16_t* sel, uint16_t s
if (can_filter_all) {
for (auto& col : table_col_names) {
// clean block to read predicate columns and acid columns
- block->get_by_name(col).column->assume_mutable()->clear();
+
block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
- block->get_by_name(col.first).column->assume_mutable()->clear();
+
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
- block->get_by_name(col.first).column->assume_mutable()->clear();
+
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
}
Block::erase_useless_column(block, origin_column_num);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2639,8 +2674,14 @@ Status OrcReader::_convert_dict_cols_to_string_cols(
return Status::OK();
}
if (!_dict_filter_cols.empty()) {
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
for (auto& dict_filter_cols : _dict_filter_cols) {
- size_t pos = block->get_position_by_name(dict_filter_cols.first);
+ if (!name_to_pos_map.contains(dict_filter_cols.first)) {
+ return Status::InternalError("Failed to find dict filter
column '{}' in block {}",
+ dict_filter_cols.first,
block->dump_structure());
+ }
+ auto pos = name_to_pos_map[dict_filter_cols.first];
ColumnWithTypeAndName& column_with_type_and_name =
block->get_by_position(pos);
const ColumnPtr& column = column_with_type_and_name.column;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 96feeceea8d..c0a4b2a1704 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -25,6 +25,7 @@
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
+#include <memory>
#include <ostream>
#include "common/config.h"
@@ -392,25 +393,32 @@ Status RowGroupReader::_read_column_data(Block* block,
FilterMap& filter_map) {
size_t batch_read_rows = 0;
bool has_eof = false;
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_idx = block->get_name_to_pos_map();
for (auto& read_col_name : table_columns) {
- auto& column_with_type_and_name = block->get_by_name(read_col_name);
+ auto& column_with_type_and_name =
block->safe_get_by_position(name_to_idx[read_col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
bool is_dict_filter = false;
for (auto& _dict_filter_col : _dict_filter_cols) {
if (_dict_filter_col.first == read_col_name) {
MutableColumnPtr dict_column = ColumnInt32::create();
- size_t pos = block->get_position_by_name(read_col_name);
+ if (!name_to_idx.contains(read_col_name)) {
+ return Status::InternalError(
+ "Wrong read column '{}' in parquet file, block:
{}", read_col_name,
+ block->dump_structure());
+ }
if (column_type->is_nullable()) {
- block->get_by_position(pos).type =
+ block->get_by_position(name_to_idx[read_col_name]).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
- pos,
+ name_to_idx[read_col_name],
ColumnNullable::create(std::move(dict_column),
ColumnUInt8::create(dict_column->size(), 0)));
} else {
- block->get_by_position(pos).type =
std::make_shared<DataTypeInt32>();
- block->replace_by_position(pos, std::move(dict_column));
+ block->get_by_position(name_to_idx[read_col_name]).type =
+ std::make_shared<DataTypeInt32>();
+ block->replace_by_position(name_to_idx[read_col_name],
std::move(dict_column));
}
is_dict_filter = true;
break;
@@ -511,20 +519,25 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
}
const uint8_t* __restrict filter_map_data = result_filter.data();
- filter_map_ptr.reset(new FilterMap());
+ filter_map_ptr = std::make_unique<FilterMap>();
RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows,
can_filter_all));
if (filter_map_ptr->filter_all()) {
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
- for (auto& col : _lazy_read_ctx.predicate_columns.first) {
+ auto name_to_idx = block->get_name_to_pos_map();
+ for (const auto& col : _lazy_read_ctx.predicate_columns.first)
{
// clean block to read predicate columns
- block->get_by_name(col).column->assume_mutable()->clear();
+
block->get_by_position(name_to_idx[col]).column->assume_mutable()->clear();
}
- for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
-
block->get_by_name(col.first).column->assume_mutable()->clear();
+ for (const auto& col :
_lazy_read_ctx.predicate_partition_columns) {
+ block->get_by_position(name_to_idx[col.first])
+ .column->assume_mutable()
+ ->clear();
}
- for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
-
block->get_by_name(col.first).column->assume_mutable()->clear();
+ for (const auto& col :
_lazy_read_ctx.predicate_missing_columns) {
+ block->get_by_position(name_to_idx[col.first])
+ .column->assume_mutable()
+ ->clear();
}
if (_row_id_column_iterator_pair.first != nullptr) {
block->get_by_position(_row_id_column_iterator_pair.second)
@@ -660,10 +673,12 @@ Status RowGroupReader::_fill_partition_columns(
const std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>&
partition_columns) {
DataTypeSerDe::FormatOptions _text_formatOptions;
- for (auto& kv : partition_columns) {
- auto doris_column = block->get_by_name(kv.first).column;
- IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
- auto& [value, slot_desc] = kv.second;
+ auto name_to_idx = block->get_name_to_pos_map();
+ for (const auto& kv : partition_columns) {
+ auto doris_column =
block->get_by_position(name_to_idx[kv.first]).column;
+ // obtained from block*, it is a mutable object.
+ auto* col_ptr = const_cast<IColumn*>(doris_column.get());
+ const auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
uint64_t num_deserialized = 0;
@@ -688,15 +703,23 @@ Status RowGroupReader::_fill_partition_columns(
Status RowGroupReader::_fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) {
- for (auto& kv : missing_columns) {
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_idx = block->get_name_to_pos_map();
+ std::set<size_t> positions_to_erase;
+ for (const auto& kv : missing_columns) {
+ if (!name_to_idx.contains(kv.first)) {
+ return Status::InternalError("Missing column: {} not found in
block {}", kv.first,
+ block->dump_structure());
+ }
if (kv.second == nullptr) {
// no default column, fill with null
- auto mutable_column =
block->get_by_name(kv.first).column->assume_mutable();
+ auto mutable_column =
+
block->get_by_position(name_to_idx[kv.first]).column->assume_mutable();
auto* nullable_column =
assert_cast<vectorized::ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
// fill with default value
- auto& ctx = kv.second;
+ const auto& ctx = kv.second;
auto origin_column_num = block->columns();
int result_column_id = -1;
// PT1 => dest primitive type
@@ -711,15 +734,16 @@ Status RowGroupReader::_fill_missing_columns(
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a
normal column
result_column_ptr =
result_column_ptr->convert_to_full_column_if_const();
- auto origin_column_type = block->get_by_name(kv.first).type;
+ auto origin_column_type =
block->get_by_position(name_to_idx[kv.first]).type;
bool is_nullable = origin_column_type->is_nullable();
block->replace_by_position(
- block->get_position_by_name(kv.first),
+ name_to_idx[kv.first],
is_nullable ? make_nullable(result_column_ptr) :
result_column_ptr);
- block->erase(result_column_id);
+ positions_to_erase.insert(result_column_id);
}
}
}
+ block->erase(positions_to_erase);
return Status::OK();
}
@@ -1071,13 +1095,20 @@ Status
RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes,
}
void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_idx = block->get_name_to_pos_map();
for (auto& dict_filter_cols : _dict_filter_cols) {
- size_t pos = block->get_position_by_name(dict_filter_cols.first);
- ColumnWithTypeAndName& column_with_type_and_name =
block->get_by_position(pos);
+ if (!name_to_idx.contains(dict_filter_cols.first)) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Wrong read column '{}' in parquet file, block:
{}",
+ dict_filter_cols.first, block->dump_structure());
+ }
+ ColumnWithTypeAndName& column_with_type_and_name =
+ block->get_by_position(name_to_idx[dict_filter_cols.first]);
const ColumnPtr& column = column_with_type_and_name.column;
- if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*column)) {
+ if (const auto* nullable_column =
check_and_get_column<ColumnNullable>(*column)) {
const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
- const ColumnInt32* dict_column = assert_cast<const
ColumnInt32*>(nested_column.get());
+ const auto* dict_column = assert_cast<const
ColumnInt32*>(nested_column.get());
DCHECK(dict_column);
MutableColumnPtr string_column =
@@ -1087,16 +1118,18 @@ void
RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
block->replace_by_position(
- pos, ColumnNullable::create(std::move(string_column),
-
nullable_column->get_null_map_column_ptr()));
+ name_to_idx[dict_filter_cols.first],
+ ColumnNullable::create(std::move(string_column),
+
nullable_column->get_null_map_column_ptr()));
} else {
- const ColumnInt32* dict_column = assert_cast<const
ColumnInt32*>(column.get());
+ const auto* dict_column = assert_cast<const
ColumnInt32*>(column.get());
MutableColumnPtr string_column =
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
dict_column);
column_with_type_and_name.type =
std::make_shared<DataTypeString>();
- block->replace_by_position(pos, std::move(string_column));
+ block->replace_by_position(name_to_idx[dict_filter_cols.first],
+ std::move(string_column));
}
}
}
diff --git a/be/src/vec/exec/format/table/equality_delete.cpp
b/be/src/vec/exec/format/table/equality_delete.cpp
index 7f8452f4b18..48914a02144 100644
--- a/be/src/vec/exec/format/table/equality_delete.cpp
+++ b/be/src/vec/exec/format/table/equality_delete.cpp
@@ -45,15 +45,16 @@ Status SimpleEqualityDelete::_build_set() {
Status SimpleEqualityDelete::filter_data_block(Block* data_block) {
SCOPED_TIMER(equality_delete_time);
- auto* column_and_type = data_block->try_get_by_name(_delete_column_name);
- if (column_and_type == nullptr) {
- return Status::InternalError("Can't find the delete column '{}' in
data file",
- _delete_column_name);
+ int pos = data_block->get_position_by_name(_delete_column_name);
+ if (pos == -1) {
+ return Status::InternalError("Column '{}' not found in data block:
{}", _delete_column_name,
+ data_block->dump_structure());
}
- if (column_and_type->type->get_primitive_type() != _delete_column_type) {
+ auto column_and_type = data_block->get_by_position(pos);
+ if (column_and_type.type->get_primitive_type() != _delete_column_type) {
return Status::InternalError(
"Not support type change in column '{}', src type: {}, target
type: {}",
- _delete_column_name, column_and_type->type->get_name(),
(int)_delete_column_type);
+ _delete_column_name, column_and_type.type->get_name(),
(int)_delete_column_type);
}
size_t rows = data_block->rows();
// _filter: 1 => in _hybrid_set; 0 => not in _hybrid_set
@@ -64,12 +65,12 @@ Status SimpleEqualityDelete::filter_data_block(Block*
data_block) {
_filter->assign(rows, UInt8(0));
}
- if (column_and_type->column->is_nullable()) {
+ if (column_and_type.column->is_nullable()) {
const NullMap& null_map =
- reinterpret_cast<const
ColumnNullable*>(column_and_type->column.get())
+ reinterpret_cast<const
ColumnNullable*>(column_and_type.column.get())
->get_null_map_data();
_hybrid_set->find_batch_nullable(
-
remove_nullable(column_and_type->column)->assume_mutable_ref(), rows, null_map,
+ remove_nullable(column_and_type.column)->assume_mutable_ref(),
rows, null_map,
*_filter);
if (_hybrid_set->contain_null()) {
auto* filter_data = _filter->data();
@@ -78,7 +79,7 @@ Status SimpleEqualityDelete::filter_data_block(Block*
data_block) {
}
}
} else {
- _hybrid_set->find_batch(column_and_type->column->assume_mutable_ref(),
rows, *_filter);
+ _hybrid_set->find_batch(column_and_type.column->assume_mutable_ref(),
rows, *_filter);
}
// should reverse _filter
auto* filter_data = _filter->data();
@@ -108,19 +109,22 @@ Status MultiEqualityDelete::_build_set() {
Status MultiEqualityDelete::filter_data_block(Block* data_block) {
SCOPED_TIMER(equality_delete_time);
size_t column_index = 0;
- for (std::string column_name : _delete_block->get_names()) {
- auto* column_and_type = data_block->try_get_by_name(column_name);
- if (column_and_type == nullptr) {
- return Status::InternalError("Can't find the delete column '{}' in
data file",
- column_name);
+
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = data_block->get_name_to_pos_map();
+ for (auto delete_col : _delete_block->get_columns_with_type_and_name()) {
+ const std::string& column_name = delete_col.name;
+ auto column_and_type =
data_block->safe_get_by_position(name_to_pos_map[column_name]);
+ if (name_to_pos_map.contains(column_name) == false) {
+ return Status::InternalError("Column '{}' not found in data block:
{}", column_name,
+ data_block->dump_structure());
}
- if
(!_delete_block->get_by_name(column_name).type->equals(*column_and_type->type))
{
+ if (!delete_col.type->equals(*column_and_type.type)) {
return Status::InternalError(
"Not support type change in column '{}', src type: {},
target type: {}",
- column_name,
_delete_block->get_by_name(column_name).type->get_name(),
- column_and_type->type->get_name());
+ column_name, delete_col.type->get_name(),
column_and_type.type->get_name());
}
- _data_column_index[column_index++] =
data_block->get_position_by_name(column_name);
+ _data_column_index[column_index++] = name_to_pos_map[column_name];
}
size_t rows = data_block->rows();
_data_hashes.clear();
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index a9640e353b1..9b78f11a177 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -223,20 +223,32 @@ void IcebergTableReader::_generate_equality_delete_block(
}
Status IcebergTableReader::_expand_block_if_need(Block* block) {
+ std::set<std::string> names;
+ auto block_names = block->get_names();
+ names.insert(block_names.begin(), block_names.end());
for (auto& col : _expand_columns) {
col.column->assume_mutable()->clear();
- if (block->try_get_by_name(col.name)) {
+ if (names.contains(col.name)) {
return Status::InternalError("Wrong expand column '{}'", col.name);
}
+ names.insert(col.name);
block->insert(col);
}
return Status::OK();
}
Status IcebergTableReader::_shrink_block_if_need(Block* block) {
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
+ std::set<size_t> positions_to_erase;
for (const std::string& expand_col : _expand_col_names) {
- block->erase(expand_col);
+ if (!name_to_pos_map.contains(expand_col)) {
+ return Status::InternalError("Wrong erase column '{}', block: {}",
expand_col,
+ block->dump_names());
+ }
+ positions_to_erase.emplace(name_to_pos_map[expand_col]);
}
+ block->erase(positions_to_erase);
return Status::OK();
}
@@ -383,9 +395,11 @@ void
IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& d
void IcebergTableReader::_gen_position_delete_file_range(Block& block,
DeleteFile* position_delete,
size_t read_rows,
bool
file_path_column_dictionary_coded) {
- ColumnPtr path_column = block.get_by_name(ICEBERG_FILE_PATH).column;
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block.get_name_to_pos_map();
+ ColumnPtr path_column =
block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column;
DCHECK_EQ(path_column->size(), read_rows);
- ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column;
+ ColumnPtr pos_column =
block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column;
using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
const int64_t* src_data = assert_cast<const
ColumnType&>(*pos_column).get_data().data();
IcebergTableReader::PositionDeleteRange range;
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp
b/be/src/vec/exec/format/wal/wal_reader.cpp
index bb9b4318e9c..2390f210d3e 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -60,7 +60,9 @@ Status WalReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
_wal_path);
}
Block src_block;
- RETURN_IF_ERROR(src_block.deserialize(pblock));
+ size_t uncompressed_size = 0;
+ int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size,
&uncompressed_time));
//convert to dst block
Block dst_block;
int index = 0;
diff --git a/be/src/vec/exec/jni_connector.cpp
b/be/src/vec/exec/jni_connector.cpp
index 91457cb7718..7168639c59a 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -324,8 +324,10 @@ Status JniConnector::_fill_block(Block* block, size_t
num_rows) {
SCOPED_RAW_TIMER(&_fill_block_watcher);
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+ // todo: maybe do not need to build name to index map every time
+ auto name_to_pos_map = block->get_name_to_pos_map();
for (int i = 0; i < _column_names.size(); ++i) {
- auto& column_with_type_and_name = block->get_by_name(_column_names[i]);
+ auto& column_with_type_and_name =
block->get_by_position(name_to_pos_map[_column_names[i]]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
RETURN_IF_ERROR(_fill_column(_table_meta, column_ptr, column_type,
num_rows));
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index 116fcbd33e7..476eae26ebe 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -543,6 +543,9 @@ Status FileScanner::_check_output_block_types() {
Status FileScanner::_init_src_block(Block* block) {
if (!_is_load) {
_src_block_ptr = block;
+
+ // todo: maybe do not need to build name to index map every time
+ _src_block_name_to_idx = block->get_name_to_pos_map();
return Status::OK();
}
RETURN_IF_ERROR(_check_output_block_types());
@@ -609,7 +612,7 @@ Status FileScanner::_cast_to_input_block(Block* block) {
// skip variant type
continue;
}
- auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
+ auto& arg =
_src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
auto return_type = slot_desc->get_data_type_ptr();
// remove nullable here, let the get_function decide whether nullable
auto data_type =
get_data_type_with_default_argument(remove_nullable(return_type));
@@ -637,7 +640,9 @@ Status FileScanner::_fill_columns_from_path(size_t rows) {
}
DataTypeSerDe::FormatOptions _text_formatOptions;
for (auto& kv : _partition_col_descs) {
- auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
+ auto doris_column =
+
_src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column;
+ // _src_block_ptr points to a mutable block created by this class
itself, so const_cast can be used here.
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
@@ -669,7 +674,8 @@ Status FileScanner::_fill_missing_columns(size_t rows) {
for (auto& kv : _missing_col_descs) {
if (kv.second == nullptr) {
// no default column, fill with null
- auto mutable_column =
_src_block_ptr->get_by_name(kv.first).column->assume_mutable();
+ auto mutable_column =
_src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first])
+ .column->assume_mutable();
auto* nullable_column =
static_cast<vectorized::ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
@@ -689,10 +695,15 @@ Status FileScanner::_fill_missing_columns(size_t rows) {
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a
normal column
result_column_ptr =
result_column_ptr->convert_to_full_column_if_const();
- auto origin_column_type =
_src_block_ptr->get_by_name(kv.first).type;
+ auto origin_column_type =
+
_src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).type;
bool is_nullable = origin_column_type->is_nullable();
+ if (!_src_block_name_to_idx.contains(kv.first)) {
+ return Status::InternalError("Column {} not found in src
block {}", kv.first,
+
_src_block_ptr->dump_structure());
+ }
_src_block_ptr->replace_by_position(
- _src_block_ptr->get_position_by_name(kv.first),
+ _src_block_name_to_idx[kv.first],
is_nullable ? make_nullable(result_column_ptr) :
result_column_ptr);
_src_block_ptr->erase(result_column_id);
}
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp
b/be/src/vec/exec/scan/olap_scanner.cpp
index 3f56d9e5be8..c0c0c35f0be 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -427,8 +427,8 @@ Status OlapScanner::_init_tablet_reader_params(
DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block",
DBUG_BLOCK);
if (!_state->skip_storage_engine_merge()) {
- TOlapScanNode& olap_scan_node =
-
((pipeline::OlapScanLocalState*)_local_state)->olap_scan_node();
+ auto* olap_scan_local_state =
(pipeline::OlapScanLocalState*)_local_state;
+ TOlapScanNode& olap_scan_node =
olap_scan_local_state->olap_scan_node();
// order by table keys optimization for topn
// will only read head/tail of data file since it's already sorted by
keys
if (olap_scan_node.__isset.sort_info &&
!olap_scan_node.sort_info.is_asc_order.empty()) {
@@ -440,16 +440,20 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.read_orderby_key_num_prefix_columns =
olap_scan_node.sort_info.is_asc_order.size();
_tablet_reader_params.read_orderby_key_limit = _limit;
- _tablet_reader_params.filter_block_conjuncts = _conjuncts;
+
+ if (_tablet_reader_params.read_orderby_key_limit > 0 &&
+ olap_scan_local_state->_storage_no_merge()) {
+ _tablet_reader_params.filter_block_conjuncts = _conjuncts;
+ _conjuncts.clear();
+ }
}
// set push down topn filter
_tablet_reader_params.topn_filter_source_node_ids =
- ((pipeline::OlapScanLocalState*)_local_state)
- ->get_topn_filter_source_node_ids(_state, true);
+ olap_scan_local_state->get_topn_filter_source_node_ids(_state,
true);
if (!_tablet_reader_params.topn_filter_source_node_ids.empty()) {
_tablet_reader_params.topn_filter_target_node_id =
-
((pipeline::OlapScanLocalState*)_local_state)->parent()->node_id();
+ olap_scan_local_state->parent()->node_id();
}
}
diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index c46c4f4a90f..c9391960c8e 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -113,8 +113,6 @@ Status Scanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
RETURN_IF_ERROR(_get_block_impl(state, block, eof));
if (*eof) {
DCHECK(block->rows() == 0);
- // clear TEMP columns to avoid column align problem
- block->erase_tmp_columns();
break;
}
_num_rows_read += block->rows();
@@ -145,11 +143,6 @@ Status Scanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
}
Status Scanner::_filter_output_block(Block* block) {
- Defer clear_tmp_block([&]() { block->erase_tmp_columns(); });
- if (block->has(BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED)) {
- // scanner filter_block is already done (only by _topn_next
currently), just skip it
- return Status::OK();
- }
auto old_rows = block->rows();
Status st = VExprContext::filter_block(_conjuncts, block,
block->columns());
_counter.num_rows_unselected += old_rows - block->rows();
diff --git a/be/src/vec/functions/function_helpers.cpp
b/be/src/vec/functions/function_helpers.cpp
index 6862e9addb9..877e47b5318 100644
--- a/be/src/vec/functions/function_helpers.cpp
+++ b/be/src/vec/functions/function_helpers.cpp
@@ -97,14 +97,6 @@ std::tuple<Block, ColumnNumbers>
create_block_with_nested_columns(const Block& b
}
}
- // TODO: only support match function, rethink the logic
- for (const auto& ctn : block) {
- if (ctn.name.size() > BeConsts::BLOCK_TEMP_COLUMN_PREFIX.size() &&
- starts_with(ctn.name, BeConsts::BLOCK_TEMP_COLUMN_PREFIX)) {
- res.insert(ctn);
- }
- }
-
return {std::move(res), std::move(res_args)};
}
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 7f51bf7c1f0..a3186d2d982 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -411,7 +411,6 @@ Status BlockReader::_unique_key_next_block(Block* block,
bool* eof) {
block->insert(column_with_type_and_name);
RETURN_IF_ERROR(Block::filter_block(block, target_columns_size,
target_columns_size));
_stats.rows_del_filtered += target_block_row - block->rows();
- DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") ==
nullptr);
if (UNLIKELY(_reader_context.record_rowids)) {
DCHECK_EQ(_block_row_locations.size(), block->rows() +
delete_count);
}
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index 3f375eb2c82..deb384de0ba 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -93,6 +93,7 @@ void VCollectIterator::init(TabletReader* reader, bool
ori_data_overlapping, boo
_topn_limit = _reader->_reader_context.read_orderby_key_limit;
} else {
_topn_limit = 0;
+ DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0);
}
}
@@ -259,8 +260,6 @@ Status VCollectIterator::_topn_next(Block* block) {
return Status::Error<END_OF_FILE>("");
}
- // clear TEMP columns to avoid column align problem
- block->erase_tmp_columns();
auto clone_block = block->clone_empty();
/*
select id, "${tR2}",
@@ -316,8 +315,6 @@ Status VCollectIterator::_topn_next(Block* block) {
if (status.is<END_OF_FILE>()) {
eof = true;
if (block->rows() == 0) {
- // clear TEMP columns to avoid column align problem in
segment iterator
- block->erase_tmp_columns();
break;
}
} else {
@@ -328,8 +325,6 @@ Status VCollectIterator::_topn_next(Block* block) {
// filter block
RETURN_IF_ERROR(VExprContext::filter_block(
_reader->_reader_context.filter_block_conjuncts, block,
block->columns()));
- // clear TMPE columns to avoid column align problem in
mutable_block.add_rows bellow
- block->erase_tmp_columns();
// update read rows
read_rows += block->rows();
@@ -452,12 +447,6 @@ Status VCollectIterator::_topn_next(Block* block) {
<< " sorted_row_pos.size()=" << sorted_row_pos.size()
<< " mutable_block.rows()=" << mutable_block.rows();
*block = mutable_block.to_block();
- // append a column to indicate scanner filter_block is already done
- auto filtered_datatype = std::make_shared<DataTypeUInt8>();
- auto filtered_column = filtered_datatype->create_column_const(
- block->rows(), Field::create_field<TYPE_BOOLEAN>(1));
- block->insert(
- {filtered_column, filtered_datatype,
BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED});
_topn_eof = true;
return block->rows() > 0 ? Status::OK() : Status::Error<END_OF_FILE>("");
@@ -894,8 +883,6 @@ Status
VCollectIterator::Level1Iterator::_normal_next(Block* block) {
while (res.is<END_OF_FILE>() && !_children.empty()) {
_cur_child = std::move(*(_children.begin()));
_children.pop_front();
- // clear TEMP columns to avoid column align problem
- block->erase_tmp_columns();
res = _cur_child->next(block);
}
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 5f6e376367d..24bdf4e87a6 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -515,7 +515,6 @@ Status VerticalBlockReader::_unique_key_next_block(Block*
block, bool* eof) {
RETURN_IF_ERROR(
Block::filter_block(block, target_columns.size(),
target_columns.size()));
_stats.rows_del_filtered += block_rows - block->rows();
- DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") ==
nullptr);
if (UNLIKELY(_reader_context.record_rowids)) {
DCHECK_EQ(_block_row_locations.size(), block->rows() +
delete_count);
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 9cd8d4bb8d9..e6ff6f1a8a8 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -101,8 +101,8 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block*
block, bool* eos) {
RETURN_IF_ERROR(block_item.get_block(next_block));
size_t block_byte_size = block_item.block_byte_size();
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer,
block_item.deserialize_time());
- COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
- COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
+ COUNTER_UPDATE(_recvr->_decompress_timer, block_item.decompress_time());
+ COUNTER_UPDATE(_recvr->_decompress_bytes, block_item.decompress_bytes());
_recvr->_memory_used_counter->update(-(int64_t)block_byte_size);
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
sub_blocks_memory_usage(block_byte_size);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 1d752b1bad8..1a1c84f3e67 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -271,7 +271,8 @@ protected:
DCHECK(_pblock);
SCOPED_RAW_TIMER(&_deserialize_time);
_block = Block::create_unique();
-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(*_pblock));
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
+ _block->deserialize(*_pblock, &_decompress_bytes,
&_decompress_time));
}
block.swap(_block);
_block.reset();
@@ -280,6 +281,8 @@ protected:
size_t block_byte_size() const { return _block_byte_size; }
int64_t deserialize_time() const { return _deserialize_time; }
+ int64_t decompress_time() const { return _decompress_time; }
+ size_t decompress_bytes() const { return _decompress_bytes; }
BlockItem() = default;
BlockItem(BlockUPtr&& block, size_t block_byte_size)
: _block(std::move(block)), _block_byte_size(block_byte_size)
{}
@@ -292,6 +295,8 @@ protected:
std::unique_ptr<PBlock> _pblock;
size_t _block_byte_size = 0;
int64_t _deserialize_time = 0;
+ int64_t _decompress_time = 0;
+ size_t _decompress_bytes = 0;
};
std::list<BlockItem> _block_queue;
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index 9105ba9e057..b8280785546 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -47,10 +47,12 @@ Status GetArrowResultBatchCtx::on_data(const
std::shared_ptr<vectorized::Block>&
if (_result != nullptr) {
auto* arrow_buffer =
assert_cast<ArrowFlightResultBlockBuffer*>(buffer);
size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ int64_t compressed_time = 0;
SCOPED_TIMER(arrow_buffer->_serialize_batch_ns_timer);
- RETURN_IF_ERROR(block->serialize(
- arrow_buffer->_be_exec_version, _result->mutable_block(),
&uncompressed_bytes,
- &compressed_bytes,
arrow_buffer->_fragment_transmission_compression_type, false));
+ RETURN_IF_ERROR(block->serialize(arrow_buffer->_be_exec_version,
_result->mutable_block(),
+ &uncompressed_bytes,
&compressed_bytes, &compressed_time,
+
arrow_buffer->_fragment_transmission_compression_type,
+ false));
COUNTER_UPDATE(arrow_buffer->_uncompressed_bytes_counter,
uncompressed_bytes);
COUNTER_UPDATE(arrow_buffer->_compressed_bytes_counter,
compressed_bytes);
_result->set_packet_seq(packet_seq);
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index b4af185adb3..a05ce66ff98 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -347,12 +347,13 @@ Status BlockSerializer::serialize_block(const Block* src,
PBlock* dest, size_t n
SCOPED_TIMER(_parent->_serialize_batch_timer);
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ int64_t compress_time = 0;
RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest,
&uncompressed_bytes,
- &compressed_bytes,
_parent->compression_type(),
+ &compressed_bytes, &compress_time,
_parent->compression_type(),
_parent->transfer_large_data_by_brpc()));
COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes *
num_receivers);
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes *
num_receivers);
- COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
+ COUNTER_UPDATE(_parent->_compress_timer, compress_time);
#ifndef BE_TEST
_parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes(
compressed_bytes * num_receivers);
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp
b/be/src/vec/sink/vtablet_block_convertor.cpp
index e720b5f694e..5bb8f2cbe0f 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -689,10 +689,6 @@ Status
OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si
Status
OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block*
block,
size_t
rows) {
- // avoid duplicate PARTIAL_UPDATE_AUTO_INC_COL
- if (block->has(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL)) {
- return Status::OK();
- }
auto dst_column = vectorized::ColumnInt64::create();
vectorized::ColumnInt64::Container& dst_values = dst_column->get_data();
size_t null_value_count = rows;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index d7a269287af..fe51df4dcb2 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -866,8 +866,9 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
if (block.rows() > 0) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ int64_t compressed_time = 0;
Status st = block.serialize(state->be_exec_version(),
request->mutable_block(),
- &uncompressed_bytes, &compressed_bytes,
+ &uncompressed_bytes, &compressed_bytes,
&compressed_time,
state->fragement_transmission_compression_type(),
_parent->_transfer_large_data_by_brpc);
TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st);
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp
b/be/src/vec/sink/writer/vwal_writer.cpp
index 5a64b6f7c43..35436da82f1 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -73,8 +73,9 @@ Status VWalWriter::write_wal(vectorized::Block* block) {
{ return Status::InternalError("Failed to write wal!"); });
PBlock pblock;
size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ int64_t compressed_time = 0;
RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock,
&uncompressed_bytes,
- &compressed_bytes,
+ &compressed_bytes, &compressed_time,
segment_v2::CompressionTypePB::NO_COMPRESSION));
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*>
{&pblock}));
return Status::OK();
diff --git a/be/src/vec/spill/spill_reader.cpp
b/be/src/vec/spill/spill_reader.cpp
index de3eea6b625..bb74d20ec33 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -142,7 +142,9 @@ Status SpillReader::read(Block* block, bool* eos) {
if (!pb_block_.ParseFromArray(result.data,
cast_set<int>(result.size))) {
return Status::InternalError("Failed to read spilled block");
}
- RETURN_IF_ERROR(block->deserialize(pb_block_));
+ size_t uncompressed_size = 0;
+ int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR(block->deserialize(pb_block_, &uncompressed_size,
&uncompressed_time));
}
COUNTER_UPDATE(_read_block_data_size, block->bytes());
COUNTER_UPDATE(_read_rows_count, block->rows());
diff --git a/be/src/vec/spill/spill_writer.cpp
b/be/src/vec/spill/spill_writer.cpp
index a43ba83ccb9..a43162f43ce 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -115,9 +115,10 @@ Status SpillWriter::_write_internal(const Block& block,
size_t& written_bytes) {
{
PBlock pblock;
SCOPED_TIMER(_serialize_timer);
+ int64_t compressed_time = 0;
status = block.serialize(
BeExecVersionManager::get_newest_version(), &pblock,
&uncompressed_bytes,
- &compressed_bytes,
+ &compressed_bytes, &compressed_time,
segment_v2::CompressionTypePB::ZSTD); // ZSTD for better
compression ratio
RETURN_IF_ERROR(status);
int64_t pblock_mem = pblock.ByteSizeLong();
diff --git a/be/test/olap/wal/wal_reader_writer_test.cpp
b/be/test/olap/wal/wal_reader_writer_test.cpp
index 94fc8ff91a2..d443cdfa849 100644
--- a/be/test/olap/wal/wal_reader_writer_test.cpp
+++ b/be/test/olap/wal/wal_reader_writer_test.cpp
@@ -64,8 +64,10 @@ void covert_block_to_pb(
segment_v2::CompressionTypePB compression_type =
segment_v2::CompressionTypePB::SNAPPY) {
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
- Status st = block.serialize(BeExecVersionManager::get_newest_version(),
pblock,
- &uncompressed_bytes, &compressed_bytes,
compression_type);
+ int64_t compressed_time = 0;
+ Status st =
+ block.serialize(BeExecVersionManager::get_newest_version(),
pblock, &uncompressed_bytes,
+ &compressed_bytes, &compressed_time,
compression_type);
EXPECT_TRUE(st.ok());
EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
EXPECT_EQ(compressed_bytes, pblock->column_values().size());
@@ -132,7 +134,9 @@ TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
break;
}
vectorized::Block block;
- EXPECT_TRUE(block.deserialize(pblock).ok());
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ EXPECT_TRUE(block.deserialize(pblock, &uncompress_size,
&uncompressed_time).ok());
EXPECT_EQ(block_rows, block.rows());
}
static_cast<void>(wal_reader.finalize());
diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
index bed8be3e4f3..0bf6691d7d0 100644
--- a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
+++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
@@ -318,8 +318,9 @@ class MockClosure : public google::protobuf::Closure {
void to_pblock(Block& block, PBlock* pblock) {
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
+ int64_t compressed_time = 0;
EXPECT_TRUE(block.serialize(BeExecVersionManager::get_newest_version(),
pblock,
- &uncompressed_bytes, &compressed_bytes,
+ &uncompressed_bytes, &compressed_bytes,
&compressed_time,
segment_v2::CompressionTypePB::NO_COMPRESSION));
}
diff --git a/be/test/pipeline/operator/materialization_shared_state_test.cpp
b/be/test/pipeline/operator/materialization_shared_state_test.cpp
index 51c097207f1..d96653b2dfe 100644
--- a/be/test/pipeline/operator/materialization_shared_state_test.cpp
+++ b/be/test/pipeline/operator/materialization_shared_state_test.cpp
@@ -119,8 +119,9 @@ TEST_F(MaterializationSharedStateTest,
TestMergeMultiResponse) {
auto serialized_block = response_.add_blocks()->mutable_block();
size_t uncompressed_size = 0;
size_t compressed_size = 0;
+ int64_t compress_time = 0;
auto s = resp_block1.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
- CompressionTypePB::LZ4);
+ &compress_time, CompressionTypePB::LZ4);
EXPECT_TRUE(s.ok());
_shared_state->rpc_struct_map[_backend_id1].response =
std::move(response_);
@@ -141,8 +142,9 @@ TEST_F(MaterializationSharedStateTest,
TestMergeMultiResponse) {
auto serialized_block = response_.add_blocks()->mutable_block();
size_t uncompressed_size = 0;
size_t compressed_size = 0;
+ int64_t compress_time = 0;
auto s = resp_block2.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
- CompressionTypePB::LZ4);
+ &compress_time, CompressionTypePB::LZ4);
EXPECT_TRUE(s.ok());
_shared_state->rpc_struct_map[_backend_id2].response =
std::move(response_);
@@ -219,8 +221,9 @@ TEST_F(MaterializationSharedStateTest,
TestMergeMultiResponseMultiBlocks) {
auto serialized_block = response_.add_blocks()->mutable_block();
size_t uncompressed_size = 0;
size_t compressed_size = 0;
+ int64_t compress_time = 0;
auto s = resp_block1.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
- CompressionTypePB::LZ4);
+ &compress_time, CompressionTypePB::LZ4);
EXPECT_TRUE(s.ok());
_shared_state->rpc_struct_map[_backend_id1].response =
std::move(response_);
@@ -240,8 +243,9 @@ TEST_F(MaterializationSharedStateTest,
TestMergeMultiResponseMultiBlocks) {
auto serialized_block = response_.add_blocks()->mutable_block();
size_t uncompressed_size = 0;
size_t compressed_size = 0;
+ int64_t compress_time = 0;
auto s = resp_block2.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
- CompressionTypePB::LZ4);
+ &compress_time, CompressionTypePB::LZ4);
EXPECT_TRUE(s.ok());
_shared_state->rpc_struct_map[_backend_id2].response =
std::move(response_);
@@ -260,8 +264,9 @@ TEST_F(MaterializationSharedStateTest,
TestMergeMultiResponseMultiBlocks) {
_shared_state->rpc_struct_map[_backend_id1].response.add_blocks()->mutable_block();
size_t uncompressed_size = 0;
size_t compressed_size = 0;
+ int64_t compress_time = 0;
auto s = resp_block1.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
- CompressionTypePB::LZ4);
+ &compress_time, CompressionTypePB::LZ4);
EXPECT_TRUE(s.ok());
_shared_state->response_blocks[1] = resp_block1.clone_empty();
}
@@ -278,8 +283,9 @@ TEST_F(MaterializationSharedStateTest,
TestMergeMultiResponseMultiBlocks) {
_shared_state->rpc_struct_map[_backend_id2].response.add_blocks()->mutable_block();
size_t uncompressed_size = 0;
size_t compressed_size = 0;
+ int64_t compress_time = 0;
auto s = resp_block2.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
- CompressionTypePB::LZ4);
+ &compress_time, CompressionTypePB::LZ4);
EXPECT_TRUE(s.ok());
}
diff --git a/be/test/testutil/mock/mock_data_stream_sender.h
b/be/test/testutil/mock/mock_data_stream_sender.h
index dc9d2542beb..cd076cca521 100644
--- a/be/test/testutil/mock/mock_data_stream_sender.h
+++ b/be/test/testutil/mock/mock_data_stream_sender.h
@@ -54,7 +54,10 @@ struct MockChannel : public Channel {
return Status::OK();
}
Block nblock;
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(nblock.deserialize(*_pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
+ nblock.deserialize(*_pblock, &uncompress_size,
&uncompressed_time));
if (!nblock.empty()) {
RETURN_IF_ERROR(_send_block.merge(std::move(nblock)));
}
@@ -64,7 +67,10 @@ struct MockChannel : public Channel {
Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false) override {
Block nblock;
-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(nblock.deserialize(*block->get_block()));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
+ nblock.deserialize(*block->get_block(), &uncompress_size,
&uncompressed_time));
if (!nblock.empty()) {
RETURN_IF_ERROR(_send_block.merge(std::move(nblock)));
}
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 07e4254c7fc..9b316baca60 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -74,8 +74,10 @@ void block_to_pb(
segment_v2::CompressionTypePB compression_type =
segment_v2::CompressionTypePB::SNAPPY) {
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
- Status st = block.serialize(BeExecVersionManager::get_newest_version(),
pblock,
- &uncompressed_bytes, &compressed_bytes,
compression_type);
+ int64_t compress_time = 0;
+ Status st =
+ block.serialize(BeExecVersionManager::get_newest_version(),
pblock, &uncompressed_bytes,
+ &compressed_bytes, &compress_time,
compression_type);
EXPECT_TRUE(st.ok());
// const column maybe uncompressed_bytes<compressed_bytes
// as the serialize_bytes add some additional byets:
STREAMVBYTE_PADDING=16;
@@ -147,7 +149,9 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -169,7 +173,9 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -192,7 +198,9 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -219,7 +227,9 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -240,7 +250,9 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -263,7 +275,9 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -287,7 +301,9 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -303,7 +319,9 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -328,7 +346,9 @@ void serialize_and_deserialize_test_one() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -354,7 +374,9 @@ void serialize_and_deserialize_test_int() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -378,7 +400,9 @@ void serialize_and_deserialize_test_int() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -403,7 +427,9 @@ void serialize_and_deserialize_test_long() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -427,7 +453,9 @@ void serialize_and_deserialize_test_long() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -452,7 +480,9 @@ void serialize_and_deserialize_test_string() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
std::string s2 = pblock2.DebugString();
@@ -477,7 +507,9 @@ void serialize_and_deserialize_test_string() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
std::string s2 = pblock2.DebugString();
@@ -505,7 +537,9 @@ void serialize_and_deserialize_test_nullable() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -530,7 +564,9 @@ void serialize_and_deserialize_test_nullable() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -554,7 +590,9 @@ void serialize_and_deserialize_test_nullable() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
std::string s2 = pblock2.DebugString();
@@ -576,7 +614,9 @@ void serialize_and_deserialize_test_nullable() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
std::string s2 = pblock2.DebugString();
@@ -602,7 +642,9 @@ void serialize_and_deserialize_test_decimal() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -626,7 +668,9 @@ void serialize_and_deserialize_test_decimal() {
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
std::string s2 = pblock2.DebugString();
@@ -658,7 +702,9 @@ void serialize_and_deserialize_test_bitmap() {
std::string s1 = pblock.DebugString();
std::string bb1 = block.dump_data(0, 1024);
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
std::string bb2 = block2.dump_data(0, 1024);
EXPECT_EQ(bb1, bb2);
PBlock pblock2;
@@ -688,7 +734,9 @@ void serialize_and_deserialize_test_bitmap() {
std::string s1 = pblock.DebugString();
std::string bb1 = block.dump_data(0, 1024);
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
std::string bb2 = block2.dump_data(0, 1024);
EXPECT_EQ(bb1, bb2);
EXPECT_EQ(block.dump_data_json(0, 1024), block2.dump_data_json(0,
1024));
@@ -709,7 +757,9 @@ void serialize_and_deserialize_test_array() {
block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY);
std::string s1 = pblock.DebugString();
vectorized::Block block2;
- static_cast<void>(block2.deserialize(pblock));
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ static_cast<void>(block2.deserialize(pblock, &uncompress_size,
&uncompressed_time));
PBlock pblock2;
block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
std::string s2 = pblock2.DebugString();
@@ -1075,26 +1125,6 @@ TEST(BlockTest, ctor) {
ASSERT_EQ(block.columns(), 2);
ASSERT_EQ(block.get_by_position(0).type->get_primitive_type(), TYPE_INT);
ASSERT_TRUE(block.get_by_position(1).type->is_nullable());
-
- {
- auto mutable_block =
-
vectorized::MutableBlock::create_unique(tbl->get_tuple_descs(), 10, false);
- ASSERT_EQ(mutable_block->columns(), 2);
- auto mutable_block2 = vectorized::MutableBlock::create_unique();
- mutable_block->swap(*mutable_block2);
- ASSERT_EQ(mutable_block->columns(), 0);
- ASSERT_EQ(mutable_block2->columns(), 2);
- }
-
- {
- auto mutable_block =
-
vectorized::MutableBlock::create_unique(tbl->get_tuple_descs(), 10, true);
- ASSERT_EQ(mutable_block->columns(), 1);
- auto mutable_block2 = vectorized::MutableBlock::create_unique();
- mutable_block->swap(*mutable_block2);
- ASSERT_EQ(mutable_block->columns(), 0);
- ASSERT_EQ(mutable_block2->columns(), 1);
- }
}
TEST(BlockTest, insert_erase) {
@@ -1125,39 +1155,20 @@ TEST(BlockTest, insert_erase) {
block.erase_tail(0);
ASSERT_EQ(block.columns(), 0);
- EXPECT_ANY_THROW(block.erase("column"));
column_with_name =
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeString>({});
block.insert(0, column_with_name);
- EXPECT_NO_THROW(block.erase("column"));
- ASSERT_EQ(block.columns(), 0);
-
- EXPECT_ANY_THROW(block.safe_get_by_position(0));
+ ASSERT_EQ(block.columns(), 1);
- ASSERT_EQ(block.try_get_by_name("column"), nullptr);
- EXPECT_ANY_THROW(block.get_by_name("column"));
- EXPECT_ANY_THROW(block.get_position_by_name("column"));
block.insert(0, column_with_name);
- EXPECT_NO_THROW(auto item = block.get_by_name("column"));
- ASSERT_NE(block.try_get_by_name("column"), nullptr);
EXPECT_EQ(block.get_position_by_name("column"), 0);
- block.insert({nullptr, nullptr, BeConsts::BLOCK_TEMP_COLUMN_PREFIX});
- EXPECT_NO_THROW(auto item =
block.get_by_name(BeConsts::BLOCK_TEMP_COLUMN_PREFIX));
-
- block.erase_tmp_columns();
- ASSERT_EQ(block.try_get_by_name(BeConsts::BLOCK_TEMP_COLUMN_PREFIX),
nullptr);
-
{
// test const block
const auto const_block = block;
- EXPECT_EQ(const_block.try_get_by_name("column2"), nullptr);
- EXPECT_ANY_THROW(const_block.get_by_name("column2"));
- EXPECT_ANY_THROW(const_block.get_position_by_name("column2"));
+ EXPECT_EQ(const_block.get_position_by_name("column2"), -1);
- EXPECT_NO_THROW(auto item = const_block.get_by_name("column"));
- ASSERT_NE(const_block.try_get_by_name("column"), nullptr);
EXPECT_EQ(const_block.get_position_by_name("column"), 0);
}
@@ -1166,14 +1177,7 @@ TEST(BlockTest, insert_erase) {
block.insert({nullptr, std::make_shared<vectorized::DataTypeString>(),
"col2"});
- vectorized::MutableBlock mutable_block(&block);
- mutable_block.erase("col1");
- ASSERT_EQ(mutable_block.columns(), 2);
-
- EXPECT_ANY_THROW(mutable_block.erase("col1"));
- ASSERT_EQ(mutable_block.columns(), 2);
- mutable_block.erase("col2");
- ASSERT_EQ(mutable_block.columns(), 1);
+ ASSERT_EQ(block.columns(), 3);
}
TEST(BlockTest, check_number_of_rows) {
@@ -1352,8 +1356,6 @@ TEST(BlockTest, others) {
mutable_block.clear_column_data();
ASSERT_EQ(mutable_block.get_column_by_position(0)->size(), 0);
- ASSERT_TRUE(mutable_block.has("column"));
- ASSERT_EQ(mutable_block.get_position_by_name("column"), 0);
auto dumped_names = mutable_block.dump_names();
ASSERT_TRUE(dumped_names.find("column") != std::string::npos);
diff --git a/be/test/vec/data_types/common_data_type_test.h
b/be/test/vec/data_types/common_data_type_test.h
index 4425dd31c94..745b3d19155 100644
--- a/be/test/vec/data_types/common_data_type_test.h
+++ b/be/test/vec/data_types/common_data_type_test.h
@@ -252,13 +252,16 @@ public:
auto pblock = std::make_unique<PBlock>();
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
+ int64_t compress_time = 0;
segment_v2::CompressionTypePB compression_type =
segment_v2::CompressionTypePB::ZSTD;
Status st = block->serialize(be_exec_version, pblock.get(),
&uncompressed_bytes,
- &compressed_bytes, compression_type);
+ &compressed_bytes, &compress_time,
compression_type);
ASSERT_EQ(st.ok(), true);
// deserialize
auto block_1 = std::make_shared<Block>();
- st = block_1->deserialize(*pblock);
+ size_t uncompress_size = 0;
+ int64_t uncompressed_time = 0;
+ st = block_1->deserialize(*pblock, &uncompress_size,
&uncompressed_time);
ASSERT_EQ(st.ok(), true);
// check block_1 and block is same
for (auto col_idx = 0; col_idx < block->columns(); ++col_idx) {
diff --git a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
index 9eec232f75f..08864a86fef 100644
--- a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
@@ -172,8 +172,8 @@ static void read_parquet_lines(std::vector<std::string>
numeric_types,
bool eof = false;
size_t read_row = 0;
static_cast<void>(p_reader->get_next_block(block.get(), &read_row, &eof));
- auto row_id_string_column =
- static_cast<const
ColumnString&>(*block->get_by_name("row_id").column.get());
+ auto row_id_string_column = static_cast<const ColumnString&>(
+
*block->get_by_position(block->get_position_by_name("row_id")).column.get());
auto read_lines_tmp = read_lines;
for (auto i = 0; i < row_id_string_column.size(); i++) {
GlobalRowLoacationV2 info =
@@ -184,7 +184,7 @@ static void read_parquet_lines(std::vector<std::string>
numeric_types,
EXPECT_EQ(info.backend_id, BackendOptions::get_backend_id());
EXPECT_EQ(info.version, IdManager::ID_VERSION);
}
- block->erase("row_id");
+ block->erase(block->get_position_by_name("row_id"));
EXPECT_EQ(block->dump_data(), block_dump);
std::cout << block->dump_data();
diff --git a/be/test/vec/exec/orc/orc_read_lines.cpp
b/be/test/vec/exec/orc/orc_read_lines.cpp
index 7bdd529c7bd..c73d6604b06 100644
--- a/be/test/vec/exec/orc/orc_read_lines.cpp
+++ b/be/test/vec/exec/orc/orc_read_lines.cpp
@@ -157,8 +157,8 @@ static void read_orc_line(int64_t line, std::string
block_dump) {
bool eof = false;
size_t read_row = 0;
static_cast<void>(reader->get_next_block(block.get(), &read_row, &eof));
- auto row_id_string_column =
- static_cast<const
ColumnString&>(*block->get_by_name("row_id").column.get());
+ auto row_id_string_column = static_cast<const ColumnString&>(
+
*block->get_by_position(block->get_position_by_name("row_id")).column.get());
for (auto i = 0; i < row_id_string_column.size(); i++) {
GlobalRowLoacationV2 info =
*((GlobalRowLoacationV2*)row_id_string_column.get_data_at(i).data);
@@ -167,7 +167,7 @@ static void read_orc_line(int64_t line, std::string
block_dump) {
EXPECT_EQ(info.backend_id, BackendOptions::get_backend_id());
EXPECT_EQ(info.version, IdManager::ID_VERSION);
}
- block->erase("row_id");
+ block->erase(block->get_position_by_name("row_id"));
std::cout << block->dump_data();
EXPECT_EQ(block->dump_data(), block_dump);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]