This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new dda7604e16 [Bug][Storage-vectorized] fix code dump on outer join with
not nullable column (#9112)
dda7604e16 is described below
commit dda7604e16c0e6fed07708855f47c294b47c8e90
Author: Pxl <[email protected]>
AuthorDate: Thu Apr 21 11:02:04 2022 +0800
[Bug][Storage-vectorized] fix code dump on outer join with not nullable
column (#9112)
---
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 49 ++++++-------------
be/src/olap/rowset/segment_v2/segment_iterator.h | 11 ++---
be/src/runtime/thread_mem_tracker_mgr.h | 1 -
be/src/vec/core/block.cpp | 9 ++++
be/src/vec/core/block.h | 42 ++++++++--------
be/src/vec/exec/volap_scan_node.cpp | 56 +++++++++++-----------
be/src/vec/exec/volap_scan_node.h | 3 ++
7 files changed, 81 insertions(+), 90 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index e2da9f2bd2..6c342c6742 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -662,8 +662,10 @@ void SegmentIterator::_vec_init_lazy_materialization() {
// todo(wb) make a cost-based lazy-materialization
framework
// check non-pred column type to decide whether using
lazy-materialization
FieldType type = _schema.column(cid)->type();
- if (_is_all_column_basic_type && (type ==
OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT
- || type == OLAP_FIELD_TYPE_VARCHAR || type ==
OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_STRING)) {
+ if (_is_all_column_basic_type &&
+ (type == OLAP_FIELD_TYPE_HLL || type ==
OLAP_FIELD_TYPE_OBJECT ||
+ type == OLAP_FIELD_TYPE_VARCHAR || type ==
OLAP_FIELD_TYPE_CHAR ||
+ type == OLAP_FIELD_TYPE_STRING)) {
_is_all_column_basic_type = false;
}
}
@@ -747,17 +749,7 @@ Status SegmentIterator::_read_columns(const
std::vector<ColumnId>& column_ids,
void SegmentIterator::_init_current_block(
vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>&
current_columns) {
- bool is_block_mem_reuse = block->mem_reuse();
- if (is_block_mem_reuse) {
- block->clear_column_data(_schema.num_column_ids());
- } else { // pre fill output block here
- for (size_t i = 0; i < _schema.num_column_ids(); i++) {
- auto cid = _schema.column_id(i);
- auto column_desc = _schema.column(cid);
- auto data_type = Schema::get_data_type_ptr(*column_desc);
- block->insert({nullptr, std::move(data_type),
column_desc->name()});
- }
- }
+ block->clear_column_data(_schema.num_column_ids());
for (size_t i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
@@ -766,12 +758,8 @@ void SegmentIterator::_init_current_block(
if (_is_pred_column[cid]) { //todo(wb) maybe we can release it after
output block
current_columns[cid]->clear();
} else { // non-predicate column
- if (is_block_mem_reuse) {
- current_columns[cid] =
std::move(*block->get_by_position(i).column).mutate();
- } else {
- auto data_type = Schema::get_data_type_ptr(*column_desc);
- current_columns[cid] = data_type->create_column();
- }
+ current_columns[cid] =
std::move(*block->get_by_position(i).column).mutate();
+
if (column_desc->type() == OLAP_FIELD_TYPE_DATE) {
current_columns[cid]->set_date_type();
} else if (column_desc->type() == OLAP_FIELD_TYPE_DATETIME) {
@@ -782,7 +770,7 @@ void SegmentIterator::_init_current_block(
}
}
-void SegmentIterator::_output_non_pred_columns(vectorized::Block* block, bool
is_block_mem_reuse) {
+void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
for (auto cid : _non_predicate_columns) {
block->replace_by_position(_schema_block_id_map[cid],
@@ -907,6 +895,8 @@ void
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column
Status SegmentIterator::next_batch(vectorized::Block* block) {
bool is_mem_reuse = block->mem_reuse();
+ DCHECK(is_mem_reuse);
+
SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
if (UNLIKELY(!_inited)) {
RETURN_IF_ERROR(_init(true));
@@ -941,24 +931,15 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
// todo(wb) abstract make column where
if (!_is_pred_column[cid]) { // non-predicate
block->replace_by_position(i,
std::move(_current_return_columns[cid]));
- } else { // predicate
- if (!is_mem_reuse) {
- auto column_desc = _schema.column(cid);
- auto data_type = Schema::get_data_type_ptr(*column_desc);
- block->replace_by_position(i, data_type->create_column());
- }
}
}
- // not sure whether block is clear before enter segmentIter, so clear
it here.
- if (is_mem_reuse) {
- block->clear_column_data();
- }
+ block->clear_column_data();
return Status::EndOfFile("no more data in segment");
}
// when no predicate(include delete condition) is provided, output column
directly
if (_vec_pred_column_ids.empty() && _short_cir_pred_column_ids.empty()) {
- _output_non_pred_columns(block, is_mem_reuse);
+ _output_non_pred_columns(block);
} else { // need predicate evaluation
uint16_t selected_size = nrows_read;
uint16_t sel_rowid_idx[selected_size];
@@ -970,7 +951,7 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
// So output block directly after vectorization evaluation
if (_is_all_column_basic_type) {
RETURN_IF_ERROR(_output_column_by_sel_idx(block,
_first_read_column_ids, sel_rowid_idx,
- selected_size,
is_mem_reuse));
+ selected_size));
} else {
// step 2: evaluate short ciruit predicate
// todo(wb) research whether need to read short predicate after
vectorization evaluation
@@ -986,7 +967,7 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
// step4: output columns
// 4.1 output non-predicate column
- _output_non_pred_columns(block, is_mem_reuse);
+ _output_non_pred_columns(block);
// 4.2 get union of short_cir_pred and vec_pred
std::set<ColumnId> pred_column_ids;
@@ -996,7 +977,7 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
// 4.3 output short circuit and predicate column
RETURN_IF_ERROR(_output_column_by_sel_idx(block, pred_column_ids,
sel_rowid_idx,
- selected_size,
is_mem_reuse));
+ selected_size));
}
}
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 42a2cafc90..489d19b2a2 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -101,21 +101,20 @@ private:
std::vector<vectorized::MutableColumnPtr>&
non_pred_vector);
void _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t&
selected_size);
void _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t*
selected_size);
- void _output_non_pred_columns(vectorized::Block* block, bool
is_block_mem_reuse);
+ void _output_non_pred_columns(vectorized::Block* block);
void _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
std::vector<rowid_t>& rowid_vector, uint16_t*
sel_rowid_idx,
size_t select_size,
vectorized::MutableColumns* mutable_columns);
template <class Container>
Status _output_column_by_sel_idx(vectorized::Block* block, const
Container& column_ids,
- uint16_t* sel_rowid_idx, uint16_t
select_size,
- bool is_block_mem_reuse) {
+ uint16_t* sel_rowid_idx, uint16_t
select_size) {
SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
for (auto cid : column_ids) {
int block_cid = _schema_block_id_map[cid];
- RETURN_IF_ERROR(block->copy_column_data_to_block(
- is_block_mem_reuse, _current_return_columns[cid].get(),
sel_rowid_idx,
- select_size, block_cid, _opts.block_row_max));
+
RETURN_IF_ERROR(block->copy_column_data_to_block(_current_return_columns[cid].get(),
+ sel_rowid_idx,
select_size, block_cid,
+
_opts.block_row_max));
}
return Status::OK();
}
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h
b/be/src/runtime/thread_mem_tracker_mgr.h
index 79c152c243..4ca2adba3e 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -103,7 +103,6 @@ public:
void update_tracker_id(int64_t tracker_id);
void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
- DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end());
_mem_trackers[mem_tracker->id()] = mem_tracker;
DCHECK(_mem_trackers[mem_tracker->id()]);
_untracked_mems[mem_tracker->id()] = 0;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index dc970df517..e304a1973e 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -65,6 +65,15 @@ Block::Block(const ColumnsWithTypeAndName& data_) : data
{data_} {
initialize_index_by_name();
}
+Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) {
+ for (const auto slot_desc : slots) {
+ auto column_ptr = slot_desc->get_empty_mutable_column();
+ column_ptr->reserve(block_size);
+ insert(ColumnWithTypeAndName(std::move(column_ptr),
slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+}
+
Block::Block(const PBlock& pblock) {
const char* buf = nullptr;
std::string compression_scratch;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 087f4959cc..ee032900a3 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -29,6 +29,8 @@
#include <vector>
#include "gen_cpp/data.pb.h"
+#include "runtime/descriptors.h"
+#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block_info.h"
#include "vec/core/column_with_type_and_name.h"
@@ -70,6 +72,7 @@ public:
Block(std::initializer_list<ColumnWithTypeAndName> il);
Block(const ColumnsWithTypeAndName& data_);
Block(const PBlock& pblock);
+ Block(const std::vector<SlotDescriptor*>& slots, size_t block_size);
/// insert the column at the specified position
void insert(size_t position, const ColumnWithTypeAndName& elem);
@@ -100,8 +103,7 @@ public:
ColumnWithTypeAndName& get_by_position(size_t position) { return
data[position]; }
const ColumnWithTypeAndName& get_by_position(size_t position) const {
return data[position]; }
- Status copy_column_data_to_block(bool is_block_mem_reuse,
- doris::vectorized::IColumn* input_col_ptr,
+ Status copy_column_data_to_block(doris::vectorized::IColumn* input_col_ptr,
uint16_t* sel_rowid_idx, uint16_t
select_size, int block_cid,
size_t batch_size) {
// Only the additional deleted filter condition need to materialize
column be at the end of the block
@@ -111,25 +113,22 @@ public:
// `select b from table;`
// a column only effective in segment iterator, the block from query
engine only contain the b column.
// so the `block_cid >= data.size()` is true
- if (block_cid >= data.size())
+ if (block_cid >= data.size()) {
return Status::OK();
+ }
- if (is_block_mem_reuse) {
- auto* raw_res_ptr = this->get_by_position(block_cid).column.get();
-
const_cast<doris::vectorized::IColumn*>(raw_res_ptr)->reserve(batch_size);
- return input_col_ptr->filter_by_selector(
- sel_rowid_idx, select_size,
- const_cast<doris::vectorized::IColumn*>(raw_res_ptr));
- } else {
- MutableColumnPtr res_col_ptr =
data[block_cid].type->create_column();
- res_col_ptr->reserve(batch_size);
- auto* raw_res_ptr = res_col_ptr.get();
- RETURN_IF_ERROR(input_col_ptr->filter_by_selector(
- sel_rowid_idx, select_size,
- const_cast<doris::vectorized::IColumn*>(raw_res_ptr)));
- this->replace_by_position(block_cid, std::move(res_col_ptr));
- return Status::OK();
+ MutableColumnPtr raw_res_ptr =
this->get_by_position(block_cid).column->assume_mutable();
+ raw_res_ptr->reserve(batch_size);
+
+ // adapt for outer join change column to nullable
+ if (raw_res_ptr->is_nullable()) {
+ auto col_ptr_nullable =
+
reinterpret_cast<vectorized::ColumnNullable*>(raw_res_ptr.get());
+
col_ptr_nullable->get_null_map_column().insert_many_defaults(select_size);
+ raw_res_ptr = col_ptr_nullable->get_nested_column_ptr();
}
+
+ return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size,
raw_res_ptr);
}
void replace_by_position(size_t position, ColumnPtr&& res) {
@@ -311,9 +310,8 @@ public:
private:
void erase_impl(size_t position);
void initialize_index_by_name();
- bool is_column_data_null(const doris::TypeDescriptor& type_desc,
- const StringRef& data_ref,
- const IColumn* column_with_type_and_name,
int row);
+ bool is_column_data_null(const doris::TypeDescriptor& type_desc, const
StringRef& data_ref,
+ const IColumn* column_with_type_and_name, int
row);
void deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor&
type_desc,
const StringRef& data_ref, const IColumn* column, int
row,
bool padding_char);
@@ -351,7 +349,7 @@ public:
size_t rows() const;
size_t columns() const { return _columns.size(); }
- bool empty() { return rows() == 0; }
+ bool empty() const { return rows() == 0; }
MutableColumns& mutable_columns() { return _columns; }
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index 604dda8e72..4dba1e7c28 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -66,25 +66,20 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
auto doris_scanner_row_num =
_limit == -1 ? config::doris_scanner_row_num
:
std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit);
- auto block_size = _limit == -1 ? state->batch_size()
- :
std::min(static_cast<int64_t>(state->batch_size()), _limit);
- auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) /
block_size;
+ _block_size = _limit == -1 ? state->batch_size()
+ :
std::min(static_cast<int64_t>(state->batch_size()), _limit);
+ auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) /
_block_size;
auto pre_block_count =
std::min(_volap_scanners.size(),
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
block_per_scanner;
for (int i = 0; i < pre_block_count; ++i) {
- auto block = new Block;
- for (const auto slot_desc : _tuple_desc->slots()) {
- auto column_ptr = slot_desc->get_empty_mutable_column();
- column_ptr->reserve(block_size);
- block->insert(ColumnWithTypeAndName(
- std::move(column_ptr), slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
- }
+ auto block = new Block(_tuple_desc->slots(), _block_size);
_free_blocks.emplace_back(block);
_buffered_bytes += block->allocated_bytes();
}
+
_block_mem_tracker->consume(_buffered_bytes);
// read from scanner
@@ -155,7 +150,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
Status status = Status::OK();
bool eos = false;
RuntimeState* state = scanner->runtime_state();
- DCHECK(NULL != state);
+ DCHECK(nullptr != state);
if (!scanner->is_open()) {
status = scanner->open();
if (!status.ok()) {
@@ -206,8 +201,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
bool get_free_block = true;
- while (!eos && raw_rows_read < raw_rows_threshold &&
- raw_bytes_read < raw_bytes_threshold && get_free_block) {
+ while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read <
raw_bytes_threshold &&
+ get_free_block) {
if (UNLIKELY(_transfer_done)) {
eos = true;
status = Status::Cancelled("Cancelled");
@@ -233,7 +228,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
std::lock_guard<std::mutex> l(_free_blocks_lock);
_free_blocks.emplace_back(block);
} else {
- if (!blocks.empty() && blocks.back()->rows() + block->rows() <=
_runtime_state->batch_size()) {
+ if (!blocks.empty() &&
+ blocks.back()->rows() + block->rows() <=
_runtime_state->batch_size()) {
MutableBlock(blocks.back()).merge(*block);
block->clear_column_data();
std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -339,8 +335,8 @@ Status VOlapScanNode::start_scan_thread(RuntimeState*
state) {
for (auto& scan_range : _scan_ranges) {
auto tablet_id = scan_range->tablet_id;
std::string err;
- TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
- tablet_id, true, &err);
+ TabletSharedPtr tablet =
+
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
if (tablet == nullptr) {
std::stringstream ss;
ss << "failed to get tablet: " << tablet_id << ", reason: " << err;
@@ -411,7 +407,9 @@ Status VOlapScanNode::close(RuntimeState* state) {
_scan_block_added_cv.notify_all();
// join transfer thread
- if (_transfer_thread) _transfer_thread->join();
+ if (_transfer_thread) {
+ _transfer_thread->join();
+ }
// clear some block in queue
// TODO: The presence of transfer_thread here may cause Block's memory
alloc and be released not in a thread,
@@ -478,7 +476,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
}
// wait for block from queue
- Block* materialized_block = NULL;
+ Block* materialized_block = nullptr;
{
std::unique_lock<std::mutex> l(_blocks_lock);
SCOPED_TIMER(_olap_wait_batch_queue_timer);
@@ -493,14 +491,14 @@ Status VOlapScanNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
if (!_materialized_blocks.empty()) {
materialized_block = _materialized_blocks.back();
- DCHECK(materialized_block != NULL);
+ DCHECK(materialized_block != nullptr);
_materialized_blocks.pop_back();
_materialized_row_batches_bytes -=
materialized_block->allocated_bytes();
}
}
// return block
- if (NULL != materialized_block) {
+ if (nullptr != materialized_block) {
// notify scanner
_block_consumed_cv.notify_one();
// get scanner's block memory
@@ -536,8 +534,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
return _status;
}
-// TODO: we should register the mem cost of new Block in
-// alloc block
Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
{
std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -547,15 +543,19 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
return block;
}
}
+
get_free_block = false;
- return new Block();
+
+ auto block = new Block(_tuple_desc->slots(), _block_size);
+ _buffered_bytes += block->allocated_bytes();
+ return block;
}
int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int
block_per_scanner) {
std::list<VOlapScanner*> olap_scanners;
int assigned_thread_num = _running_thread;
size_t max_thread = std::min(_volap_scanners.size(),
-
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
+
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
// copy to local
{
// How many thread can apply to this query
@@ -566,7 +566,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState*
state, int block_per
thread_slot_num = _free_blocks.size() / block_per_scanner;
thread_slot_num += (_free_blocks.size() % block_per_scanner !=
0);
thread_slot_num = std::min(thread_slot_num, max_thread -
assigned_thread_num);
- if (thread_slot_num <= 0) thread_slot_num = 1;
+ if (thread_slot_num <= 0) {
+ thread_slot_num = 1;
+ }
} else {
std::lock_guard<std::mutex> l(_scan_blocks_lock);
if (_scan_blocks.empty()) {
@@ -586,9 +588,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState*
state, int block_per
auto scanner = _volap_scanners.front();
_volap_scanners.pop_front();
- if (scanner->need_to_close())
+ if (scanner->need_to_close()) {
scanner->close(state);
- else {
+ } else {
olap_scanners.push_back(scanner);
_running_thread++;
assigned_thread_num++;
diff --git a/be/src/vec/exec/volap_scan_node.h
b/be/src/vec/exec/volap_scan_node.h
index 09f77364ed..9557d5c1e7 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -39,6 +39,7 @@ public:
}
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
Status close(RuntimeState* state) override;
+
private:
void transfer_thread(RuntimeState* state);
void scanner_thread(VOlapScanner* scanner);
@@ -66,6 +67,8 @@ private:
std::shared_ptr<MemTracker> _block_mem_tracker;
int _max_materialized_blocks;
+
+ size_t _block_size = 0;
};
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]