This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 573e5476dd [Opt](load) Speed up the vectorized load (#12146)
573e5476dd is described below

commit 573e5476ddcdaab943371529f2a3279082d746b2
Author: HappenLee <[email protected]>
AuthorDate: Wed Aug 31 16:23:36 2022 +0800

    [Opt](load) Speed up the vectorized load (#12146)
    
    * [Opt](load) Speed up the vectorized load
---
 be/src/exec/base_scanner.cpp                | 33 ++++++++++++++++--------
 be/src/exec/base_scanner.h                  |  1 +
 be/src/exec/text_converter.hpp              | 22 ++++++++--------
 be/src/vec/core/block.cpp                   |  5 +++-
 be/src/vec/core/block.h                     |  2 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp   |  4 +--
 be/src/vec/exec/vbroker_scanner.cpp         | 15 +++--------
 be/src/vec/functions/function_cast.h        | 40 +++++++++++++++--------------
 be/src/vec/functions/function_helpers.cpp   | 33 ++++++++++++++++++++++++
 be/src/vec/functions/function_helpers.h     |  7 +++++
 be/test/vec/exec/vbroker_scan_node_test.cpp |  8 +++---
 be/test/vec/exec/vbroker_scanner_test.cpp   |  6 ++---
 12 files changed, 112 insertions(+), 64 deletions(-)

diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 5808d35021..b6e64e57b9 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -304,6 +304,7 @@ Status 
BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
     size_t rows = _src_block.rows();
     auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
     auto& filter_map = filter_column->get_data();
+    auto origin_column_num = _src_block.columns();
 
     for (auto slot_desc : _dest_tuple_desc->slots()) {
         if (!slot_desc->is_materialized()) {
@@ -315,7 +316,12 @@ Status 
BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
         int result_column_id = -1;
         // PT1 => dest primitive type
         RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
-        auto column_ptr = _src_block.get_by_position(result_column_id).column;
+        bool is_origin_column = result_column_id < origin_column_num;
+        auto column_ptr =
+                is_origin_column && _src_block_mem_reuse
+                        ? 
_src_block.get_by_position(result_column_id).column->clone_resized(rows)
+                        : _src_block.get_by_position(result_column_id).column;
+
         DCHECK(column_ptr != nullptr);
 
         // because of src_slot_desc is always be nullable, so the column_ptr 
after do dest_expr
@@ -373,7 +379,11 @@ Status 
BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
     }
 
     // after do the dest block insert operation, clear _src_block to remove 
the reference of origin column
-    _src_block.clear();
+    if (_src_block_mem_reuse) {
+        _src_block.clear_column_data(origin_column_num);
+    } else {
+        _src_block.clear();
+    }
 
     size_t dest_size = dest_block->columns();
     // do filter
@@ -389,15 +399,18 @@ Status 
BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
 // TODO: opt the reuse of src_block or dest_block column. some case we have to
 // shallow copy the column of src_block to dest block
 Status BaseScanner::_init_src_block() {
-    DCHECK(_src_block.columns() == 0);
-    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
-        SlotDescriptor* slot_desc = _src_slot_descs[i];
-        if (slot_desc == nullptr) {
-            continue;
+    if (_src_block.is_empty_column()) {
+        for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+            SlotDescriptor* slot_desc = _src_slot_descs[i];
+            if (slot_desc == nullptr) {
+                continue;
+            }
+            auto data_type = slot_desc->get_data_type_ptr();
+            auto column_ptr = data_type->create_column();
+            column_ptr->reserve(_state->batch_size());
+            
_src_block.insert(vectorized::ColumnWithTypeAndName(std::move(column_ptr), 
data_type,
+                                                                
slot_desc->col_name()));
         }
-        auto data_type = slot_desc->get_data_type_ptr();
-        _src_block.insert(vectorized::ColumnWithTypeAndName(
-                data_type->create_column(), slot_desc->get_data_type_ptr(), 
slot_desc->col_name()));
     }
 
     return Status::OK();
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index b59fa113a2..37406cd440 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -142,6 +142,7 @@ protected:
     std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
     std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
     vectorized::Block _src_block;
+    bool _src_block_mem_reuse = false;
     int _num_of_columns_from_file;
 
     // slot_ids for parquet predicate push down are in tuple desc
diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp
index 8baa90b6d5..5db043dc48 100644
--- a/be/src/exec/text_converter.hpp
+++ b/be/src/exec/text_converter.hpp
@@ -169,19 +169,17 @@ inline bool TextConverter::write_slot(const 
SlotDescriptor* slot_desc, Tuple* tu
 inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
                                                vectorized::MutableColumnPtr* 
column_ptr,
                                                const char* data, size_t len) {
-    vectorized::IColumn* col_ptr = column_ptr->get();
-    // \N means it's NULL
-    if (LIKELY(slot_desc->is_nullable())) {
-        auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
-        if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == 
SQL_NULL_DATA) {
-            nullable_column->insert_data(nullptr, 0);
-            return;
-        } else {
-            nullable_column->get_null_map_data().push_back(0);
-            col_ptr = &nullable_column->get_nested_column();
-        }
+    DCHECK(column_ptr->get()->is_nullable());
+    auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
+    if (len == 2 && data[0] == '\\' && data[1] == 'N') {
+        nullable_column->get_null_map_data().push_back(1);
+        
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column())
+                .insert_default();
+    } else {
+        nullable_column->get_null_map_data().push_back(0);
+        
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column())
+                .insert_data(data, len);
     }
-    reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data, 
len);
 }
 
 inline bool TextConverter::write_column(const SlotDescriptor* slot_desc,
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 5c30a78a3a..18b63a0975 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -918,11 +918,14 @@ void Block::deep_copy_slot(void* dst, MemPool* pool, 
const doris::TypeDescriptor
     }
 }
 
-MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs) {
+MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, 
int reserve_size) {
     for (auto tuple_desc : tuple_descs) {
         for (auto slot_desc : tuple_desc->slots()) {
             _data_types.emplace_back(slot_desc->get_data_type_ptr());
             _columns.emplace_back(_data_types.back()->create_column());
+            if (reserve_size != 0) {
+                _columns.back()->reserve(reserve_size);
+            }
         }
     }
 }
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 1d7ffb6a22..5fe90fdf6d 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -375,7 +375,7 @@ public:
     MutableBlock() = default;
     ~MutableBlock() = default;
 
-    MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs);
+    MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int 
reserve_size = 0);
 
     MutableBlock(Block* block)
             : _columns(block->mutate_columns()), 
_data_types(block->get_data_types()) {}
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 0b25890a50..7b080d18af 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -175,8 +175,8 @@ Status NewOlapScanner::_init_tablet_reader_params(
                             _tablet_reader_params.delete_predicates.begin()));
 
     // Merge the columns in delete predicate that not in latest schema in to 
current tablet schema
-    for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) {
-        
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version()));
+    for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
+        
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_pb->version()));
     }
 
     // Range
diff --git a/be/src/vec/exec/vbroker_scanner.cpp 
b/be/src/vec/exec/vbroker_scanner.cpp
index 41d0dcac1d..01a9f3255e 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -42,6 +42,7 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state, 
RuntimeProfile* profile,
         : BrokerScanner(state, profile, params, ranges, broker_addresses, 
pre_filter_texprs,
                         counter) {
     _text_converter.reset(new (std::nothrow) TextConverter('\\'));
+    _src_block_mem_reuse = true;
 }
 
 VBrokerScanner::~VBrokerScanner() = default;
@@ -81,6 +82,7 @@ Status VBrokerScanner::get_next(Block* output_block, bool* 
eof) {
             }
         }
     }
+    columns.clear();
 
     return _fill_dest_block(output_block, eof);
 }
@@ -88,7 +90,7 @@ Status VBrokerScanner::get_next(Block* output_block, bool* 
eof) {
 Status VBrokerScanner::_fill_dest_columns(const Slice& line,
                                           std::vector<MutableColumnPtr>& 
columns) {
     RETURN_IF_ERROR(_line_split_to_values(line));
-    if (!_success) {
+    if (UNLIKELY(!_success)) {
         // If not success, which means we met an invalid row, return.
         return Status::OK();
     }
@@ -98,19 +100,8 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
         int dest_index = idx++;
 
         auto src_slot_desc = _src_slot_descs[i];
-        if (!src_slot_desc->is_materialized()) {
-            continue;
-        }
 
         const Slice& value = _split_values[i];
-        if (is_null(value)) {
-            // nullable
-            auto* nullable_column =
-                    
reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
-            nullable_column->insert_default();
-            continue;
-        }
-
         _text_converter->write_string_column(src_slot_desc, 
&columns[dest_index], value.data,
                                              value.size);
     }
diff --git a/be/src/vec/functions/function_cast.h 
b/be/src/vec/functions/function_cast.h
index 0ed17714b4..9a34a7b1b5 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -1280,31 +1280,33 @@ private:
                 const auto& nested_type = nullable_type.get_nested_type();
 
                 Block tmp_block;
-                if (source_is_nullable)
-                    tmp_block = create_block_with_nested_columns(block, 
arguments);
-                else
-                    tmp_block = block;
+                if (source_is_nullable) {
+                    tmp_block = 
create_block_with_nested_columns_only_args(block, arguments);
+                    size_t tmp_res_index = tmp_block.columns();
+                    tmp_block.insert({nullptr, nested_type, ""});
+
+                    /// Perform the requested conversion.
+                    RETURN_IF_ERROR(
+                            wrapper(context, tmp_block, {0}, tmp_res_index, 
input_rows_count));
+
+                    const auto& tmp_res = 
tmp_block.get_by_position(tmp_res_index);
 
-                size_t tmp_res_index = block.columns();
-                tmp_block.insert({nullptr, nested_type, ""});
+                    res.column = wrap_in_nullable(
+                            tmp_res.column, 
Block({block.get_by_position(arguments[0]), tmp_res}),
+                            {0}, 1, input_rows_count);
+                } else {
+                    tmp_block = block;
 
-                /// Perform the requested conversion.
-                RETURN_IF_ERROR(
-                        wrapper(context, tmp_block, arguments, tmp_res_index, 
input_rows_count));
+                    size_t tmp_res_index = block.columns();
+                    tmp_block.insert({nullptr, nested_type, ""});
 
-                const auto& tmp_res = tmp_block.get_by_position(tmp_res_index);
+                    /// Perform the requested conversion.
+                    RETURN_IF_ERROR(wrapper(context, tmp_block, arguments, 
tmp_res_index,
+                                            input_rows_count));
 
-                /// May happen in fuzzy tests. For debug purpose.
-                if (!tmp_res.column.get()) {
-                    return Status::RuntimeError(
-                            "Couldn't convert {} to {} in 
prepare_remove_nullable wrapper.",
-                            
block.get_by_position(arguments[0]).type->get_name(),
-                            nested_type->get_name());
+                    res.column = 
tmp_block.get_by_position(tmp_res_index).column;
                 }
 
-                res.column = wrap_in_nullable(tmp_res.column,
-                                              
Block({block.get_by_position(arguments[0]), tmp_res}),
-                                              {0}, 1, input_rows_count);
                 return Status::OK();
             };
         } else if (source_is_nullable) {
diff --git a/be/src/vec/functions/function_helpers.cpp 
b/be/src/vec/functions/function_helpers.cpp
index 0e98d88ee0..fcfdd4b3a2 100644
--- a/be/src/vec/functions/function_helpers.cpp
+++ b/be/src/vec/functions/function_helpers.cpp
@@ -26,6 +26,39 @@
 
 namespace doris::vectorized {
 
+Block create_block_with_nested_columns_only_args(const Block& block, const 
ColumnNumbers& args) {
+    std::set<size_t> args_set(args.begin(), args.end());
+    Block res;
+
+    for (auto i : args_set) {
+        const auto& col = block.get_by_position(i);
+
+        if (col.type->is_nullable()) {
+            const DataTypePtr& nested_type =
+                    static_cast<const 
DataTypeNullable&>(*col.type).get_nested_type();
+
+            if (!col.column) {
+                res.insert({nullptr, nested_type, col.name});
+            } else if (auto* nullable = 
check_and_get_column<ColumnNullable>(*col.column)) {
+                const auto& nested_col = nullable->get_nested_column_ptr();
+                res.insert({nested_col, nested_type, col.name});
+            } else if (auto* const_column = 
check_and_get_column<ColumnConst>(*col.column)) {
+                const auto& nested_col =
+                        
check_and_get_column<ColumnNullable>(const_column->get_data_column())
+                                ->get_nested_column_ptr();
+                res.insert({ColumnConst::create(nested_col, 
col.column->size()), nested_type,
+                            col.name});
+            } else {
+                LOG(FATAL) << "Illegal column for DataTypeNullable";
+            }
+        } else {
+            res.insert(col);
+        }
+    }
+
+    return res;
+}
+
 static Block create_block_with_nested_columns_impl(const Block& block,
                                                    const 
std::unordered_set<size_t>& args) {
     Block res;
diff --git a/be/src/vec/functions/function_helpers.h 
b/be/src/vec/functions/function_helpers.h
index ac6601b06e..e1c1b192bb 100644
--- a/be/src/vec/functions/function_helpers.h
+++ b/be/src/vec/functions/function_helpers.h
@@ -95,6 +95,13 @@ Block create_block_with_nested_columns(const Block& block, 
const ColumnNumbers&
 Block create_block_with_nested_columns(const Block& block, const 
ColumnNumbers& args,
                                        size_t result);
 
+/// Returns the copy of a given block in only args column specified in
+/// the "arguments" parameter is replaced with its respective nested
+/// column if it is nullable.
+/// TODO: the old funciton `create_block_with_nested_columns` have perfermance 
problem, replace all
+/// by the function and delete old one.
+Block create_block_with_nested_columns_only_args(const Block& block, const 
ColumnNumbers& args);
+
 /// Checks argument type at specified index with predicate.
 /// throws if there is no argument at specified index or if predicate returns 
false.
 void validate_argument_type(const IFunction& func, const DataTypes& arguments,
diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp 
b/be/test/vec/exec/vbroker_scan_node_test.cpp
index 6cff164b10..195f468841 100644
--- a/be/test/vec/exec/vbroker_scan_node_test.cpp
+++ b/be/test/vec/exec/vbroker_scan_node_test.cpp
@@ -227,7 +227,7 @@ void VBrokerScanNodeTest::init_desc_table() {
         slot_desc.columnPos = 0;
         slot_desc.byteOffset = 0;
         slot_desc.nullIndicatorByte = 0;
-        slot_desc.nullIndicatorBit = -1;
+        slot_desc.nullIndicatorBit = 0;
         slot_desc.colName = "k1";
         slot_desc.slotIdx = 1;
         slot_desc.isMaterialized = true;
@@ -254,7 +254,7 @@ void VBrokerScanNodeTest::init_desc_table() {
         slot_desc.columnPos = 1;
         slot_desc.byteOffset = 16;
         slot_desc.nullIndicatorByte = 0;
-        slot_desc.nullIndicatorBit = -1;
+        slot_desc.nullIndicatorBit = 1;
         slot_desc.colName = "k2";
         slot_desc.slotIdx = 2;
         slot_desc.isMaterialized = true;
@@ -281,7 +281,7 @@ void VBrokerScanNodeTest::init_desc_table() {
         slot_desc.columnPos = 2;
         slot_desc.byteOffset = 32;
         slot_desc.nullIndicatorByte = 0;
-        slot_desc.nullIndicatorBit = -1;
+        slot_desc.nullIndicatorBit = 2;
         slot_desc.colName = "k3";
         slot_desc.slotIdx = 3;
         slot_desc.isMaterialized = true;
@@ -308,7 +308,7 @@ void VBrokerScanNodeTest::init_desc_table() {
         slot_desc.columnPos = 3;
         slot_desc.byteOffset = 48;
         slot_desc.nullIndicatorByte = 0;
-        slot_desc.nullIndicatorBit = -1;
+        slot_desc.nullIndicatorBit = 3;
         slot_desc.colName = "k4";
         slot_desc.slotIdx = 4;
         slot_desc.isMaterialized = true;
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp 
b/be/test/vec/exec/vbroker_scanner_test.cpp
index 7824f10c30..5cb9afc4b2 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -209,7 +209,7 @@ void VBrokerScannerTest::init_desc_table() {
         slot_desc.columnPos = 0;
         slot_desc.byteOffset = 0;
         slot_desc.nullIndicatorByte = 0;
-        slot_desc.nullIndicatorBit = -1;
+        slot_desc.nullIndicatorBit = 0;
         slot_desc.colName = "k1";
         slot_desc.slotIdx = 1;
         slot_desc.isMaterialized = true;
@@ -236,7 +236,7 @@ void VBrokerScannerTest::init_desc_table() {
         slot_desc.columnPos = 1;
         slot_desc.byteOffset = 16;
         slot_desc.nullIndicatorByte = 0;
-        slot_desc.nullIndicatorBit = -1;
+        slot_desc.nullIndicatorBit = 1;
         slot_desc.colName = "k2";
         slot_desc.slotIdx = 2;
         slot_desc.isMaterialized = true;
@@ -263,7 +263,7 @@ void VBrokerScannerTest::init_desc_table() {
         slot_desc.columnPos = 2;
         slot_desc.byteOffset = 32;
         slot_desc.nullIndicatorByte = 0;
-        slot_desc.nullIndicatorBit = -1;
+        slot_desc.nullIndicatorBit = 2;
         slot_desc.colName = "k3";
         slot_desc.slotIdx = 3;
         slot_desc.isMaterialized = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to