This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 573e5476dd [Opt](load) Speed up the vectorized load (#12146)
573e5476dd is described below
commit 573e5476ddcdaab943371529f2a3279082d746b2
Author: HappenLee <[email protected]>
AuthorDate: Wed Aug 31 16:23:36 2022 +0800
[Opt](load) Speed up the vectorized load (#12146)
* [Opt](load) Speed up the vectorized load
---
be/src/exec/base_scanner.cpp | 33 ++++++++++++++++--------
be/src/exec/base_scanner.h | 1 +
be/src/exec/text_converter.hpp | 22 ++++++++--------
be/src/vec/core/block.cpp | 5 +++-
be/src/vec/core/block.h | 2 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 4 +--
be/src/vec/exec/vbroker_scanner.cpp | 15 +++--------
be/src/vec/functions/function_cast.h | 40 +++++++++++++++--------------
be/src/vec/functions/function_helpers.cpp | 33 ++++++++++++++++++++++++
be/src/vec/functions/function_helpers.h | 7 +++++
be/test/vec/exec/vbroker_scan_node_test.cpp | 8 +++---
be/test/vec/exec/vbroker_scanner_test.cpp | 6 ++---
12 files changed, 112 insertions(+), 64 deletions(-)
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 5808d35021..b6e64e57b9 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -304,6 +304,7 @@ Status
BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
size_t rows = _src_block.rows();
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();
+ auto origin_column_num = _src_block.columns();
for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
@@ -315,7 +316,12 @@ Status
BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
- auto column_ptr = _src_block.get_by_position(result_column_id).column;
+ bool is_origin_column = result_column_id < origin_column_num;
+ auto column_ptr =
+ is_origin_column && _src_block_mem_reuse
+ ?
_src_block.get_by_position(result_column_id).column->clone_resized(rows)
+ : _src_block.get_by_position(result_column_id).column;
+
DCHECK(column_ptr != nullptr);
// because of src_slot_desc is always be nullable, so the column_ptr
after do dest_expr
@@ -373,7 +379,11 @@ Status
BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
}
// after do the dest block insert operation, clear _src_block to remove
the reference of origin column
- _src_block.clear();
+ if (_src_block_mem_reuse) {
+ _src_block.clear_column_data(origin_column_num);
+ } else {
+ _src_block.clear();
+ }
size_t dest_size = dest_block->columns();
// do filter
@@ -389,15 +399,18 @@ Status
BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
// TODO: opt the reuse of src_block or dest_block column. some case we have to
// shallow copy the column of src_block to dest block
Status BaseScanner::_init_src_block() {
- DCHECK(_src_block.columns() == 0);
- for (auto i = 0; i < _num_of_columns_from_file; ++i) {
- SlotDescriptor* slot_desc = _src_slot_descs[i];
- if (slot_desc == nullptr) {
- continue;
+ if (_src_block.is_empty_column()) {
+ for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+ SlotDescriptor* slot_desc = _src_slot_descs[i];
+ if (slot_desc == nullptr) {
+ continue;
+ }
+ auto data_type = slot_desc->get_data_type_ptr();
+ auto column_ptr = data_type->create_column();
+ column_ptr->reserve(_state->batch_size());
+
_src_block.insert(vectorized::ColumnWithTypeAndName(std::move(column_ptr),
data_type,
+
slot_desc->col_name()));
}
- auto data_type = slot_desc->get_data_type_ptr();
- _src_block.insert(vectorized::ColumnWithTypeAndName(
- data_type->create_column(), slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
return Status::OK();
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index b59fa113a2..37406cd440 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -142,6 +142,7 @@ protected:
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
vectorized::Block _src_block;
+ bool _src_block_mem_reuse = false;
int _num_of_columns_from_file;
// slot_ids for parquet predicate push down are in tuple desc
diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp
index 8baa90b6d5..5db043dc48 100644
--- a/be/src/exec/text_converter.hpp
+++ b/be/src/exec/text_converter.hpp
@@ -169,19 +169,17 @@ inline bool TextConverter::write_slot(const
SlotDescriptor* slot_desc, Tuple* tu
inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr*
column_ptr,
const char* data, size_t len) {
- vectorized::IColumn* col_ptr = column_ptr->get();
- // \N means it's NULL
- if (LIKELY(slot_desc->is_nullable())) {
- auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
- if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len ==
SQL_NULL_DATA) {
- nullable_column->insert_data(nullptr, 0);
- return;
- } else {
- nullable_column->get_null_map_data().push_back(0);
- col_ptr = &nullable_column->get_nested_column();
- }
+ DCHECK(column_ptr->get()->is_nullable());
+ auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
+ if (len == 2 && data[0] == '\\' && data[1] == 'N') {
+ nullable_column->get_null_map_data().push_back(1);
+
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column())
+ .insert_default();
+ } else {
+ nullable_column->get_null_map_data().push_back(0);
+
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column())
+ .insert_data(data, len);
}
- reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data,
len);
}
inline bool TextConverter::write_column(const SlotDescriptor* slot_desc,
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 5c30a78a3a..18b63a0975 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -918,11 +918,14 @@ void Block::deep_copy_slot(void* dst, MemPool* pool,
const doris::TypeDescriptor
}
}
-MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs) {
+MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs,
int reserve_size) {
for (auto tuple_desc : tuple_descs) {
for (auto slot_desc : tuple_desc->slots()) {
_data_types.emplace_back(slot_desc->get_data_type_ptr());
_columns.emplace_back(_data_types.back()->create_column());
+ if (reserve_size != 0) {
+ _columns.back()->reserve(reserve_size);
+ }
}
}
}
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 1d7ffb6a22..5fe90fdf6d 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -375,7 +375,7 @@ public:
MutableBlock() = default;
~MutableBlock() = default;
- MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs);
+ MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int
reserve_size = 0);
MutableBlock(Block* block)
: _columns(block->mutate_columns()),
_data_types(block->get_data_types()) {}
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 0b25890a50..7b080d18af 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -175,8 +175,8 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.delete_predicates.begin()));
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
- for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) {
-
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version()));
+ for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
+
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_pb->version()));
}
// Range
diff --git a/be/src/vec/exec/vbroker_scanner.cpp
b/be/src/vec/exec/vbroker_scanner.cpp
index 41d0dcac1d..01a9f3255e 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -42,6 +42,7 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state,
RuntimeProfile* profile,
: BrokerScanner(state, profile, params, ranges, broker_addresses,
pre_filter_texprs,
counter) {
_text_converter.reset(new (std::nothrow) TextConverter('\\'));
+ _src_block_mem_reuse = true;
}
VBrokerScanner::~VBrokerScanner() = default;
@@ -81,6 +82,7 @@ Status VBrokerScanner::get_next(Block* output_block, bool*
eof) {
}
}
}
+ columns.clear();
return _fill_dest_block(output_block, eof);
}
@@ -88,7 +90,7 @@ Status VBrokerScanner::get_next(Block* output_block, bool*
eof) {
Status VBrokerScanner::_fill_dest_columns(const Slice& line,
std::vector<MutableColumnPtr>&
columns) {
RETURN_IF_ERROR(_line_split_to_values(line));
- if (!_success) {
+ if (UNLIKELY(!_success)) {
// If not success, which means we met an invalid row, return.
return Status::OK();
}
@@ -98,19 +100,8 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
int dest_index = idx++;
auto src_slot_desc = _src_slot_descs[i];
- if (!src_slot_desc->is_materialized()) {
- continue;
- }
const Slice& value = _split_values[i];
- if (is_null(value)) {
- // nullable
- auto* nullable_column =
-
reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
- nullable_column->insert_default();
- continue;
- }
-
_text_converter->write_string_column(src_slot_desc,
&columns[dest_index], value.data,
value.size);
}
diff --git a/be/src/vec/functions/function_cast.h
b/be/src/vec/functions/function_cast.h
index 0ed17714b4..9a34a7b1b5 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -1280,31 +1280,33 @@ private:
const auto& nested_type = nullable_type.get_nested_type();
Block tmp_block;
- if (source_is_nullable)
- tmp_block = create_block_with_nested_columns(block,
arguments);
- else
- tmp_block = block;
+ if (source_is_nullable) {
+ tmp_block =
create_block_with_nested_columns_only_args(block, arguments);
+ size_t tmp_res_index = tmp_block.columns();
+ tmp_block.insert({nullptr, nested_type, ""});
+
+ /// Perform the requested conversion.
+ RETURN_IF_ERROR(
+ wrapper(context, tmp_block, {0}, tmp_res_index,
input_rows_count));
+
+ const auto& tmp_res =
tmp_block.get_by_position(tmp_res_index);
- size_t tmp_res_index = block.columns();
- tmp_block.insert({nullptr, nested_type, ""});
+ res.column = wrap_in_nullable(
+ tmp_res.column,
Block({block.get_by_position(arguments[0]), tmp_res}),
+ {0}, 1, input_rows_count);
+ } else {
+ tmp_block = block;
- /// Perform the requested conversion.
- RETURN_IF_ERROR(
- wrapper(context, tmp_block, arguments, tmp_res_index,
input_rows_count));
+ size_t tmp_res_index = block.columns();
+ tmp_block.insert({nullptr, nested_type, ""});
- const auto& tmp_res = tmp_block.get_by_position(tmp_res_index);
+ /// Perform the requested conversion.
+ RETURN_IF_ERROR(wrapper(context, tmp_block, arguments,
tmp_res_index,
+ input_rows_count));
- /// May happen in fuzzy tests. For debug purpose.
- if (!tmp_res.column.get()) {
- return Status::RuntimeError(
- "Couldn't convert {} to {} in
prepare_remove_nullable wrapper.",
-
block.get_by_position(arguments[0]).type->get_name(),
- nested_type->get_name());
+ res.column =
tmp_block.get_by_position(tmp_res_index).column;
}
- res.column = wrap_in_nullable(tmp_res.column,
-
Block({block.get_by_position(arguments[0]), tmp_res}),
- {0}, 1, input_rows_count);
return Status::OK();
};
} else if (source_is_nullable) {
diff --git a/be/src/vec/functions/function_helpers.cpp
b/be/src/vec/functions/function_helpers.cpp
index 0e98d88ee0..fcfdd4b3a2 100644
--- a/be/src/vec/functions/function_helpers.cpp
+++ b/be/src/vec/functions/function_helpers.cpp
@@ -26,6 +26,39 @@
namespace doris::vectorized {
+Block create_block_with_nested_columns_only_args(const Block& block, const
ColumnNumbers& args) {
+ std::set<size_t> args_set(args.begin(), args.end());
+ Block res;
+
+ for (auto i : args_set) {
+ const auto& col = block.get_by_position(i);
+
+ if (col.type->is_nullable()) {
+ const DataTypePtr& nested_type =
+ static_cast<const
DataTypeNullable&>(*col.type).get_nested_type();
+
+ if (!col.column) {
+ res.insert({nullptr, nested_type, col.name});
+ } else if (auto* nullable =
check_and_get_column<ColumnNullable>(*col.column)) {
+ const auto& nested_col = nullable->get_nested_column_ptr();
+ res.insert({nested_col, nested_type, col.name});
+ } else if (auto* const_column =
check_and_get_column<ColumnConst>(*col.column)) {
+ const auto& nested_col =
+
check_and_get_column<ColumnNullable>(const_column->get_data_column())
+ ->get_nested_column_ptr();
+ res.insert({ColumnConst::create(nested_col,
col.column->size()), nested_type,
+ col.name});
+ } else {
+ LOG(FATAL) << "Illegal column for DataTypeNullable";
+ }
+ } else {
+ res.insert(col);
+ }
+ }
+
+ return res;
+}
+
static Block create_block_with_nested_columns_impl(const Block& block,
const
std::unordered_set<size_t>& args) {
Block res;
diff --git a/be/src/vec/functions/function_helpers.h
b/be/src/vec/functions/function_helpers.h
index ac6601b06e..e1c1b192bb 100644
--- a/be/src/vec/functions/function_helpers.h
+++ b/be/src/vec/functions/function_helpers.h
@@ -95,6 +95,13 @@ Block create_block_with_nested_columns(const Block& block,
const ColumnNumbers&
Block create_block_with_nested_columns(const Block& block, const
ColumnNumbers& args,
size_t result);
+/// Returns the copy of a given block in only args column specified in
+/// the "arguments" parameter is replaced with its respective nested
+/// column if it is nullable.
+/// TODO: the old funciton `create_block_with_nested_columns` have perfermance
problem, replace all
+/// by the function and delete old one.
+Block create_block_with_nested_columns_only_args(const Block& block, const
ColumnNumbers& args);
+
/// Checks argument type at specified index with predicate.
/// throws if there is no argument at specified index or if predicate returns
false.
void validate_argument_type(const IFunction& func, const DataTypes& arguments,
diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp
b/be/test/vec/exec/vbroker_scan_node_test.cpp
index 6cff164b10..195f468841 100644
--- a/be/test/vec/exec/vbroker_scan_node_test.cpp
+++ b/be/test/vec/exec/vbroker_scan_node_test.cpp
@@ -227,7 +227,7 @@ void VBrokerScanNodeTest::init_desc_table() {
slot_desc.columnPos = 0;
slot_desc.byteOffset = 0;
slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
+ slot_desc.nullIndicatorBit = 0;
slot_desc.colName = "k1";
slot_desc.slotIdx = 1;
slot_desc.isMaterialized = true;
@@ -254,7 +254,7 @@ void VBrokerScanNodeTest::init_desc_table() {
slot_desc.columnPos = 1;
slot_desc.byteOffset = 16;
slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
+ slot_desc.nullIndicatorBit = 1;
slot_desc.colName = "k2";
slot_desc.slotIdx = 2;
slot_desc.isMaterialized = true;
@@ -281,7 +281,7 @@ void VBrokerScanNodeTest::init_desc_table() {
slot_desc.columnPos = 2;
slot_desc.byteOffset = 32;
slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
+ slot_desc.nullIndicatorBit = 2;
slot_desc.colName = "k3";
slot_desc.slotIdx = 3;
slot_desc.isMaterialized = true;
@@ -308,7 +308,7 @@ void VBrokerScanNodeTest::init_desc_table() {
slot_desc.columnPos = 3;
slot_desc.byteOffset = 48;
slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
+ slot_desc.nullIndicatorBit = 3;
slot_desc.colName = "k4";
slot_desc.slotIdx = 4;
slot_desc.isMaterialized = true;
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp
b/be/test/vec/exec/vbroker_scanner_test.cpp
index 7824f10c30..5cb9afc4b2 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -209,7 +209,7 @@ void VBrokerScannerTest::init_desc_table() {
slot_desc.columnPos = 0;
slot_desc.byteOffset = 0;
slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
+ slot_desc.nullIndicatorBit = 0;
slot_desc.colName = "k1";
slot_desc.slotIdx = 1;
slot_desc.isMaterialized = true;
@@ -236,7 +236,7 @@ void VBrokerScannerTest::init_desc_table() {
slot_desc.columnPos = 1;
slot_desc.byteOffset = 16;
slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
+ slot_desc.nullIndicatorBit = 1;
slot_desc.colName = "k2";
slot_desc.slotIdx = 2;
slot_desc.isMaterialized = true;
@@ -263,7 +263,7 @@ void VBrokerScannerTest::init_desc_table() {
slot_desc.columnPos = 2;
slot_desc.byteOffset = 32;
slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
+ slot_desc.nullIndicatorBit = 2;
slot_desc.colName = "k3";
slot_desc.slotIdx = 3;
slot_desc.isMaterialized = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]