This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 51abaa8 [fix](vec) Fix some bugs about vec engine (#7884)
51abaa8 is described below
commit 51abaa89f3d828dfdb8e6dfeef0d1424e28cdf05
Author: HappenLee <[email protected]>
AuthorDate: Thu Feb 3 19:21:17 2022 +0800
[fix](vec) Fix some bugs about vec engine (#7884)
1. mem leak in vcollector iter
2. query slow in agg table limit 10
3. query slow in SSB q4,q5,q6
---
be/src/exec/olap_scan_node.h | 3 ++-
be/src/exec/olap_scanner.cpp | 3 +++
be/src/olap/reader.cpp | 1 +
be/src/olap/reader.h | 5 ++++
be/src/olap/rowset/beta_rowset_reader.cpp | 33 ++++++++++++-----------
be/src/olap/rowset/rowset_reader_context.h | 2 ++
be/src/olap/storage_engine.cpp | 10 +++----
be/src/olap/tablet_schema.cpp | 3 ++-
be/src/vec/columns/column_string.cpp | 1 +
be/src/vec/exec/volap_scan_node.cpp | 42 ++++++++++++++++++------------
be/src/vec/exec/volap_scanner.cpp | 6 +++++
be/src/vec/exec/volap_scanner.h | 10 +++++--
be/src/vec/olap/block_reader.cpp | 9 ++++---
be/src/vec/olap/block_reader.h | 3 ---
be/src/vec/olap/vcollect_iterator.cpp | 1 +
15 files changed, 85 insertions(+), 47 deletions(-)
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index d57a92d..82e98d5 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -160,7 +160,7 @@ protected:
RuntimeProfile* profile);
friend class OlapScanner;
- friend class doris::vectorized::VOlapScanner;
+ friend class vectorized::VOlapScanner;
// Tuple id resolved in prepare() to set _tuple_desc;
TupleId _tuple_id;
@@ -239,6 +239,7 @@ protected:
SpinLock _status_mutex;
Status _status;
RuntimeState* _runtime_state;
+
RuntimeProfile::Counter* _scan_timer;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
RuntimeProfile::Counter* _tablet_counter;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index a1efc1d..d7dc839 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -59,6 +59,9 @@ Status OlapScanner::prepare(
const std::vector<std::pair<string,
std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters) {
set_tablet_reader();
+ // set limit to reduce end of rowset and segment mem use
+ _tablet_reader->set_batch_size(_parent->limit() == -1 ?
_parent->_runtime_state->batch_size() : std::min(
+ static_cast<int64_t>(_parent->_runtime_state->batch_size()),
_parent->limit()));
// Get olap table
TTabletId tablet_id = scan_range.tablet_id;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 13e50b8..4deda90 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -222,6 +222,7 @@ OLAPStatus TabletReader::_capture_rs_readers(const
ReaderParams& read_params,
_reader_context.runtime_state = read_params.runtime_state;
_reader_context.use_page_cache = read_params.use_page_cache;
_reader_context.sequence_id_idx = _sequence_col_idx;
+ _reader_context.batch_size = _batch_size;
*valid_rs_readers = *rs_readers;
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 82cd7ff..3137e06 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -133,6 +133,10 @@ public:
_stats.rows_vec_del_cond_filtered;
}
+ void set_batch_size(int batch_size) {
+ _batch_size = batch_size;
+ }
+
const OlapReaderStatistics& stats() const { return _stats; }
OlapReaderStatistics* mutable_stats() { return &_stats; }
@@ -210,6 +214,7 @@ protected:
bool _filter_delete = false;
int32_t _sequence_col_idx = -1;
bool _direct_mode = false;
+ int _batch_size = 1024;
CollectIterator _collect_iter;
std::vector<uint32_t> _key_cids;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 4d35f2f..263a4cc 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -131,20 +131,23 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext*
read_context) {
_iterator.reset(final_iterator);
// init input block
- _input_block.reset(new RowBlockV2(schema, 1024, _parent_tracker));
-
- // init input/output block and row
- _output_block.reset(new RowBlock(read_context->tablet_schema,
_parent_tracker));
-
- RowBlockInfo output_block_info;
- output_block_info.row_num = 1024;
- output_block_info.null_supported = true;
- // the output block's schema should be seek_columns to conform to v1
- // TODO(hkp): this should be optimized to use return_columns
- output_block_info.column_ids = *(_context->seek_columns);
- _output_block->init(output_block_info);
- _row.reset(new RowCursor());
- RETURN_NOT_OK(_row->init(*(read_context->tablet_schema),
*(_context->seek_columns)));
+ _input_block.reset(new RowBlockV2(schema,
+ std::min(1024, read_context->batch_size), _parent_tracker));
+
+ if (!read_context->is_vec) {
+ // init input/output block and row
+ _output_block.reset(new RowBlock(read_context->tablet_schema,
_parent_tracker));
+
+ RowBlockInfo output_block_info;
+ output_block_info.row_num = std::min(1024, read_context->batch_size);
+ output_block_info.null_supported = true;
+ // the output block's schema should be seek_columns to conform to v1
+ // TODO(hkp): this should be optimized to use return_columns
+ output_block_info.column_ids = *(_context->seek_columns);
+ _output_block->init(output_block_info);
+ _row.reset(new RowCursor());
+ RETURN_NOT_OK(_row->init(*(read_context->tablet_schema),
*(_context->seek_columns)));
+ }
return OLAP_SUCCESS;
}
@@ -211,7 +214,7 @@ OLAPStatus BetaRowsetReader::next_block(vectorized::Block*
block) {
}
}
is_first = false;
- } while (block->rows() < _context->runtime_state->batch_size()); // here
we should keep block.rows() < batch_size
+ } while (block->rows() < _context->batch_size); // here we should keep
block.rows() < batch_size
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index cc98419..07d9340 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -61,6 +61,8 @@ struct RowsetReaderContext {
RuntimeState* runtime_state = nullptr;
bool use_page_cache = false;
int sequence_id_idx = -1;
+ int batch_size = 1024;
+ bool is_vec = false;
};
} // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index aa1af9c..aeac350 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -570,11 +570,11 @@ void StorageEngine::stop() {
THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread);
#undef THREAD_JOIN
-#define THREADS_JOIN(threads) \
- for (const auto& thread : threads) { \
- if (thread) { \
- thread->join(); \
- } \
+#define THREADS_JOIN(threads) \
+ for (const auto& thread : threads) {\
+ if (thread) { \
+ thread->join(); \
+ } \
}
THREADS_JOIN(_path_gc_threads);
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 5b407d5..9ad710d 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -497,7 +497,8 @@ vectorized::Block TabletSchema::create_block(const
std::vector<uint32_t>& return
for (int i = 0; i < return_columns.size(); ++i) {
const auto& col = _cols[return_columns[i]];
auto data_type = vectorized::IDataType::from_olap_engine(col.type(),
col.is_nullable());
- block.insert({data_type->create_column(), data_type, col.name()});
+ auto column = data_type->create_column();
+ block.insert({std::move(column), data_type, col.name()});
}
return block;
}
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index afd3f23..9ebf879 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -323,6 +323,7 @@ void ColumnString::replicate(const uint32_t* counts, size_t
target_size, IColumn
void ColumnString::reserve(size_t n) {
offsets.reserve(n);
+ chars.reserve(n);
}
void ColumnString::resize(size_t n) {
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index b365c1d..77f0213 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -62,12 +62,19 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
_total_assign_num = 0;
_nice = 18 + std::max(0, 2 - (int)_volap_scanners.size() / 5);
- auto block_per_scanner = (config::doris_scanner_row_num +
(state->batch_size() - 1)) / state->batch_size();
- for (int i = 0; i < _volap_scanners.size() * block_per_scanner; ++i) {
+ auto doris_scanner_row_num = _limit == -1 ? config::doris_scanner_row_num :
+ std::min(static_cast<int64_t>(config::doris_scanner_row_num),
_limit);
+ auto block_size = _limit == -1 ? state->batch_size() :
+ std::min(static_cast<int64_t>(state->batch_size()), _limit);
+ auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) /
block_size;
+ auto pre_block_count =
+ std::min(_volap_scanners.size(),
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
block_per_scanner;
+
+ for (int i = 0; i < pre_block_count; ++i) {
auto block = new Block;
for (const auto slot_desc : _tuple_desc->slots()) {
auto column_ptr = slot_desc->get_empty_mutable_column();
- column_ptr->reserve(state->batch_size());
+ column_ptr->reserve(block_size);
block->insert(ColumnWithTypeAndName(std::move(column_ptr),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
@@ -240,16 +247,11 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner)
{
_scan_blocks.insert(_scan_blocks.end(), blocks.begin(),
blocks.end());
}
// If eos is true, we will process out of this lock block.
- if (!eos) {
- std::lock_guard<std::mutex> l(_volap_scanners_lock);
- _volap_scanners.push_front(scanner);
- }
+ if (eos) { scanner->mark_to_need_to_close(); }
+ std::lock_guard<std::mutex> l(_volap_scanners_lock);
+ _volap_scanners.push_front(scanner);
}
if (eos) {
- // close out of blocks lock. we do this before _progress update
- // that can assure this object can keep live before we finish.
- scanner->close(_runtime_state);
-
std::lock_guard<std::mutex> l(_scan_blocks_lock);
_progress.update(1);
if (_progress.done()) {
@@ -520,18 +522,26 @@ int
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
size_t thread_slot_num = 0;
{
std::lock_guard<std::mutex> l(_free_blocks_lock);
- thread_slot_num = (_free_blocks.size() - (assigned_thread_num *
block_per_scanner)) / block_per_scanner;
+ thread_slot_num = _free_blocks.size() / block_per_scanner;
+ thread_slot_num += (_free_blocks.size() % block_per_scanner != 0);
if (thread_slot_num == 0) thread_slot_num++;
}
{
std::lock_guard<std::mutex> l(_volap_scanners_lock);
thread_slot_num = std::min(thread_slot_num,
_volap_scanners.size());
- for (int i = 0; i < thread_slot_num; ++i) {
- olap_scanners.push_back(_volap_scanners.front());
+ for (int i = 0; i < thread_slot_num && !_volap_scanners.empty();) {
+ auto scanner = _volap_scanners.front();
_volap_scanners.pop_front();
- _running_thread++;
- assigned_thread_num++;
+
+ if (scanner->need_to_close())
+ scanner->close(state);
+ else {
+ olap_scanners.push_back(scanner);
+ _running_thread++;
+ assigned_thread_num++;
+ i++;
+ }
}
}
}
diff --git a/be/src/vec/exec/volap_scanner.cpp
b/be/src/vec/exec/volap_scanner.cpp
index 1b4bb02..7b5b31e 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -19,6 +19,8 @@
#include <memory>
+#include "runtime/runtime_state.h"
+
#include "vec/columns/column_complex.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
@@ -69,6 +71,10 @@ Status VOlapScanner::get_block(RuntimeState* state,
vectorized::Block* block, bo
return Status::OK();
}
+void VOlapScanner::set_tablet_reader() {
+ _tablet_reader = std::make_unique<BlockReader>();
+}
+
void
VOlapScanner::_convert_row_to_block(std::vector<vectorized::MutableColumnPtr>*
columns) {
size_t slots_size = _query_slots.size();
for (int i = 0; i < slots_size; ++i) {
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index 5efaf9d..0c1c4ad 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -36,19 +36,25 @@ public:
bool need_agg_finalize, const TPaloScanRange& scan_range);
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eof);
- Status get_batch(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+
+ Status get_batch(RuntimeState* state, RowBatch* row_batch, bool* eos)
override {
return Status::NotSupported("Not Implemented VOlapScanNode
Node::get_next scalar");
}
VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; }
+ void mark_to_need_to_close() { _need_to_close = true; }
+
+ bool need_to_close() { return _need_to_close; }
+
protected:
- virtual void set_tablet_reader() { _tablet_reader =
std::make_unique<BlockReader>(); }
+ virtual void set_tablet_reader() override;
private:
// TODO: Remove this function after we finish reader vec
void _convert_row_to_block(std::vector<vectorized::MutableColumnPtr>*
columns);
VExprContext* _vconjunct_ctx = nullptr;
+ bool _need_to_close = false;
};
} // namespace vectorized
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 8fda9d6..37f3f7b 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -50,6 +50,8 @@ OLAPStatus BlockReader::_init_collect_iter(const
ReaderParams& read_params,
return res;
}
+ _reader_context.batch_size = _batch_size;
+ _reader_context.is_vec = true;
for (auto& rs_reader : rs_readers) {
RETURN_NOT_OK(rs_reader->init(&_reader_context));
OLAPStatus res = _vcollect_iter.add_child(rs_reader);
@@ -76,8 +78,8 @@ void BlockReader::_init_agg_state(const ReaderParams&
read_params) {
return;
}
- _stored_data_block =
_next_row.block->create_same_struct_block(_batch_size);
- _stored_data_columns = _stored_data_block->mutate_columns();
+ _stored_data_columns =
+
_next_row.block->create_same_struct_block(_batch_size)->mutate_columns();
_stored_has_null_tag.resize(_stored_data_columns.size());
_stored_has_string_tag.resize(_stored_data_columns.size());
@@ -102,7 +104,6 @@ void BlockReader::_init_agg_state(const ReaderParams&
read_params) {
_next_row.block->get_data_type(idx)->is_nullable());
DCHECK(function != nullptr);
_agg_functions.push_back(function);
-
// create aggregate data
AggregateDataPtr place = new char[function->size_of_data()];
function->create(place);
@@ -120,7 +121,6 @@ void BlockReader::_init_agg_state(const ReaderParams&
read_params) {
OLAPStatus BlockReader::init(const ReaderParams& read_params) {
TabletReader::init(read_params);
- _batch_size = read_params.runtime_state->batch_size();
auto return_column_size =
read_params.origin_return_columns->size() - (_sequence_col_idx !=
-1 ? 1 : 0);
@@ -231,6 +231,7 @@ OLAPStatus BlockReader::_agg_key_next_block(Block* block,
MemPool* mem_pool, Obj
_merged_rows += target_block_row;
return OLAP_SUCCESS;
}
+
OLAPStatus BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool,
ObjectPool* agg_pool, bool*
eof) {
if (UNLIKELY(_eof)) {
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index b1bc7e8..e03706f 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -95,12 +95,9 @@ private:
std::vector<int> _agg_columns_idx;
std::vector<int> _return_columns_loc;
- int _batch_size = 0;
-
std::vector<int> _agg_data_counters;
int _last_agg_data_counter = 0;
- std::unique_ptr<Block> _stored_data_block;
MutableColumns _stored_data_columns;
std::vector<IteratorRowRef> _stored_row_ref;
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index 682a9ab..7efd200 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -54,6 +54,7 @@ void
VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers
for (auto [c_iter, r_iter] = std::pair {_children.begin(),
rs_readers.begin()};
c_iter != _children.end();) {
if ((*c_iter)->init() != OLAP_SUCCESS) {
+ delete (*c_iter);
c_iter = _children.erase(c_iter);
r_iter = rs_readers.erase(r_iter);
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]