This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c426c2e4b1 [Vectorized-Load] Support vectorized load table with 
materialized view (#9923)
c426c2e4b1 is described below

commit c426c2e4b1c8e0c4043d68d9ec3ec9722bbdbcca
Author: HappenLee <[email protected]>
AuthorDate: Thu Jun 2 14:59:01 2022 +0800

    [Vectorized-Load] Support vectorized load table with materialized view 
(#9923)
    
    * [Vectorized-Load] Support vectorized load table with materialized view
    
    * fix ut
    
    Co-authored-by: lihaopeng <[email protected]>
---
 be/src/exec/tablet_sink.cpp        |  2 +-
 be/src/olap/memtable.cpp           | 24 +++++++++++++++++++-----
 be/src/olap/memtable.h             |  3 +++
 be/src/vec/core/block.cpp          |  9 +++++++++
 be/src/vec/core/block.h            |  3 +++
 be/test/olap/delta_writer_test.cpp |  3 ++-
 6 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 3166736e7d..f5567a9165 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -942,7 +942,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* 
input_batch) {
     }
 
     // check intolerable failure
-    for (auto index_channel : _channels) {
+    for (const auto& index_channel : _channels) {
         RETURN_IF_ERROR(index_channel->check_intolerable_failure());
     }
     return Status::OK();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 08c92c3404..e2fefd0149 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -53,6 +53,7 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const 
TabletSchema* tablet
         // TODO: Support ZOrderComparator in the future
         _vec_skip_list = std::make_unique<VecTable>(
                 _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type 
== KeysType::DUP_KEYS);
+        _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
     } else {
         _vec_skip_list = nullptr;
         if (_keys_type == KeysType::DUP_KEYS) {
@@ -75,6 +76,18 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const 
TabletSchema* tablet
                                              _keys_type == KeysType::DUP_KEYS);
     }
 }
+void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
+                                                  const TupleDescriptor* 
tuple_desc) {
+    for (auto slot_desc : *slot_descs) {
+        const auto& slots = tuple_desc->slots();
+        for (int j = 0; j < slots.size(); ++j) {
+            if (slot_desc->id() == slots[j]->id()) {
+                _column_offset.emplace_back(j);
+                break;
+            }
+        }
+    }
+}
 
 void MemTable::_init_agg_functions(const vectorized::Block* block) {
     for (uint32_t cid = _schema->num_key_columns(); cid < 
_schema->num_columns(); ++cid) {
@@ -114,21 +127,22 @@ int MemTable::RowInBlockComparator::operator()(const 
RowInBlock* left,
                                *_pblock, -1);
 }
 
-void MemTable::insert(const vectorized::Block* block, const std::vector<int>& 
row_idxs) {
+void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<int>& row_idxs) {
+    auto target_block = input_block->copy_block(_column_offset);
     if (_is_first_insertion) {
         _is_first_insertion = false;
-        auto cloneBlock = block->clone_without_columns();
+        auto cloneBlock = target_block.clone_without_columns();
         _input_mutable_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
         _vec_row_comparator->set_block(&_input_mutable_block);
         _output_mutable_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
         if (_keys_type != KeysType::DUP_KEYS) {
-            _init_agg_functions(block);
+            _init_agg_functions(&target_block);
         }
     }
     auto num_rows = row_idxs.size();
     size_t cursor_in_mutableblock = _input_mutable_block.rows();
-    _input_mutable_block.add_rows(block, row_idxs.data(), row_idxs.data() + 
num_rows);
-    size_t input_size = block->allocated_bytes() * num_rows / block->rows();
+    _input_mutable_block.add_rows(&target_block, row_idxs.data(), 
row_idxs.data() + num_rows);
+    size_t input_size = target_block.allocated_bytes() * num_rows / 
target_block.rows();
     _mem_usage += input_size;
     _mem_tracker->consume(input_size);
 
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index c594f77679..d8f16ba4c5 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -185,6 +185,9 @@ private:
 
     std::unique_ptr<VecTable> _vec_skip_list;
     VecTable::Hint _vec_hint;
+    void _init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
+                                            const TupleDescriptor* tuple_desc);
+    std::vector<int> _column_offset;
 
     RowsetWriter* _rowset_writer;
 
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 9f743f5ca6..084a44611f 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -613,6 +613,15 @@ void filter_block_internal(Block* block, const 
IColumn::Filter& filter, uint32_t
     }
 }
 
+Block Block::copy_block(const std::vector<int>& column_offset) const {
+    ColumnsWithTypeAndName columns_with_type_and_name;
+    for (auto offset : column_offset) {
+        DCHECK(offset < data.size());
+        columns_with_type_and_name.emplace_back(data[offset]);
+    }
+    return columns_with_type_and_name;
+}
+
 Status Block::filter_block(Block* block, int filter_column_id, int 
column_to_keep) {
     ColumnPtr filter_column = block->get_by_position(filter_column_id).column;
     if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 532e9257aa..5f62365382 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -245,6 +245,9 @@ public:
     /** Get one line data from block, only use in load data */
     std::string dump_one_line(size_t row, int column_end) const;
 
+    // copy a new block by the offset column
+    Block copy_block(const std::vector<int>& column_offset) const;
+
     static Status filter_block(Block* block, int filter_conlumn_id, int 
column_to_keep);
 
     static void erase_useless_column(Block* block, int column_to_keep) {
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index 2326bd156a..db9429893a 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -361,7 +361,8 @@ TEST_F(TestDeltaWriter, open) {
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {10003, 270068375, WriteType::LOAD, 20001, 30001, 
load_id, tuple_desc};
+    WriteRequest write_req = {10003,   270068375,  WriteType::LOAD,      
20001, 30001,
+                              load_id, tuple_desc, &tuple_desc->slots(), true};
     DeltaWriter* delta_writer = nullptr;
     DeltaWriter::open(&write_req, &delta_writer);
     EXPECT_NE(delta_writer, nullptr);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to