This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c426c2e4b1 [Vectorized-Load] Support vectorized load table with
materialized view (#9923)
c426c2e4b1 is described below
commit c426c2e4b1c8e0c4043d68d9ec3ec9722bbdbcca
Author: HappenLee <[email protected]>
AuthorDate: Thu Jun 2 14:59:01 2022 +0800
[Vectorized-Load] Support vectorized load table with materialized view
(#9923)
* [Vectorized-Load] Support vectorized load table with materialized view
* fix ut
Co-authored-by: lihaopeng <[email protected]>
---
be/src/exec/tablet_sink.cpp | 2 +-
be/src/olap/memtable.cpp | 24 +++++++++++++++++++-----
be/src/olap/memtable.h | 3 +++
be/src/vec/core/block.cpp | 9 +++++++++
be/src/vec/core/block.h | 3 +++
be/test/olap/delta_writer_test.cpp | 3 ++-
6 files changed, 37 insertions(+), 7 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 3166736e7d..f5567a9165 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -942,7 +942,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch*
input_batch) {
}
// check intolerable failure
- for (auto index_channel : _channels) {
+ for (const auto& index_channel : _channels) {
RETURN_IF_ERROR(index_channel->check_intolerable_failure());
}
return Status::OK();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 08c92c3404..e2fefd0149 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -53,6 +53,7 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const
TabletSchema* tablet
// TODO: Support ZOrderComparator in the future
_vec_skip_list = std::make_unique<VecTable>(
_vec_row_comparator.get(), _table_mem_pool.get(), _keys_type
== KeysType::DUP_KEYS);
+ _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
} else {
_vec_skip_list = nullptr;
if (_keys_type == KeysType::DUP_KEYS) {
@@ -75,6 +76,18 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const
TabletSchema* tablet
_keys_type == KeysType::DUP_KEYS);
}
}
+void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
+ const TupleDescriptor*
tuple_desc) {
+ for (auto slot_desc : *slot_descs) {
+ const auto& slots = tuple_desc->slots();
+ for (int j = 0; j < slots.size(); ++j) {
+ if (slot_desc->id() == slots[j]->id()) {
+ _column_offset.emplace_back(j);
+ break;
+ }
+ }
+ }
+}
void MemTable::_init_agg_functions(const vectorized::Block* block) {
for (uint32_t cid = _schema->num_key_columns(); cid <
_schema->num_columns(); ++cid) {
@@ -114,21 +127,22 @@ int MemTable::RowInBlockComparator::operator()(const
RowInBlock* left,
*_pblock, -1);
}
-void MemTable::insert(const vectorized::Block* block, const std::vector<int>&
row_idxs) {
+void MemTable::insert(const vectorized::Block* input_block, const
std::vector<int>& row_idxs) {
+ auto target_block = input_block->copy_block(_column_offset);
if (_is_first_insertion) {
_is_first_insertion = false;
- auto cloneBlock = block->clone_without_columns();
+ auto cloneBlock = target_block.clone_without_columns();
_input_mutable_block =
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
_vec_row_comparator->set_block(&_input_mutable_block);
_output_mutable_block =
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
if (_keys_type != KeysType::DUP_KEYS) {
- _init_agg_functions(block);
+ _init_agg_functions(&target_block);
}
}
auto num_rows = row_idxs.size();
size_t cursor_in_mutableblock = _input_mutable_block.rows();
- _input_mutable_block.add_rows(block, row_idxs.data(), row_idxs.data() +
num_rows);
- size_t input_size = block->allocated_bytes() * num_rows / block->rows();
+ _input_mutable_block.add_rows(&target_block, row_idxs.data(),
row_idxs.data() + num_rows);
+ size_t input_size = target_block.allocated_bytes() * num_rows /
target_block.rows();
_mem_usage += input_size;
_mem_tracker->consume(input_size);
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index c594f77679..d8f16ba4c5 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -185,6 +185,9 @@ private:
std::unique_ptr<VecTable> _vec_skip_list;
VecTable::Hint _vec_hint;
+ void _init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
+ const TupleDescriptor* tuple_desc);
+ std::vector<int> _column_offset;
RowsetWriter* _rowset_writer;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 9f743f5ca6..084a44611f 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -613,6 +613,15 @@ void filter_block_internal(Block* block, const
IColumn::Filter& filter, uint32_t
}
}
+Block Block::copy_block(const std::vector<int>& column_offset) const {
+ ColumnsWithTypeAndName columns_with_type_and_name;
+ for (auto offset : column_offset) {
+ DCHECK(offset < data.size());
+ columns_with_type_and_name.emplace_back(data[offset]);
+ }
+ return columns_with_type_and_name;
+}
+
Status Block::filter_block(Block* block, int filter_column_id, int
column_to_keep) {
ColumnPtr filter_column = block->get_by_position(filter_column_id).column;
if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 532e9257aa..5f62365382 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -245,6 +245,9 @@ public:
/** Get one line data from block, only use in load data */
std::string dump_one_line(size_t row, int column_end) const;
+ // copy a new block by the offset column
+ Block copy_block(const std::vector<int>& column_offset) const;
+
static Status filter_block(Block* block, int filter_conlumn_id, int
column_to_keep);
static void erase_useless_column(Block* block, int column_to_keep) {
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index 2326bd156a..db9429893a 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -361,7 +361,8 @@ TEST_F(TestDeltaWriter, open) {
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
- WriteRequest write_req = {10003, 270068375, WriteType::LOAD, 20001, 30001,
load_id, tuple_desc};
+ WriteRequest write_req = {10003, 270068375, WriteType::LOAD,
20001, 30001,
+ load_id, tuple_desc, &tuple_desc->slots(), true};
DeltaWriter* delta_writer = nullptr;
DeltaWriter::open(&write_req, &delta_writer);
EXPECT_NE(delta_writer, nullptr);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]