This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit d40fcdc58d2be1031534c669aa61f8a96379ca58 Author: yiguolei <[email protected]> AuthorDate: Mon May 30 16:04:40 2022 +0800 [Improvement] optimize scannode concurrency query performance in vectorized engine. (#9792) --- be/src/common/config.h | 4 ++++ be/src/exec/olap_scan_node.cpp | 14 ++++++++---- be/src/exec/olap_scan_node.h | 9 ++++++-- be/src/exec/olap_scanner.cpp | 1 - be/src/olap/bloom_filter_predicate.h | 17 +++++++++++++- be/src/vec/exec/volap_scan_node.cpp | 44 ++++++++++++++++++++++++++++-------- 6 files changed, 70 insertions(+), 19 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 48582beabe..90faf93006 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -718,6 +718,10 @@ CONF_Int32(object_pool_buffer_size, "100"); // ParquetReaderWrap prefetch buffer size CONF_Int32(parquet_reader_max_buffer_size, "50"); +// When the rows number reached this limit, will check the filter rate the of bloomfilter +// if it is lower than a specific threshold, the predicate will be disabled. +CONF_mInt32(bloom_filter_predicate_check_row_num, "1000"); + } // namespace config } // namespace doris diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 0d40d40b3a..7ea7ca3510 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -175,6 +175,8 @@ Status OlapScanNode::prepare(RuntimeState* state) { // create scanner profile // create timer _tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT); + _scanner_sched_counter = ADD_COUNTER(runtime_profile(), "ScannerSchedCount ", TUnit::UNIT); + _rows_pushed_cond_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsPushedCondFiltered", TUnit::UNIT); _init_counter(state); @@ -679,11 +681,11 @@ Status OlapScanNode::build_scan_key() { return Status::OK(); } -static Status get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range, - int block_row_count, bool is_begin_include, bool is_end_include, - const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range, - std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range, - RuntimeProfile* profile) { +Status OlapScanNode::get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range, + int block_row_count, bool is_begin_include, bool is_end_include, + const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range, + std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range, + RuntimeProfile* profile) { RuntimeProfile::Counter* show_hints_timer = profile->get_counter("ShowHintsTime_V1"); std::vector<std::vector<OlapTuple>> ranges; bool have_valid_range = false; @@ -1439,6 +1441,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { std::bind(&OlapScanNode::scanner_thread, this, *iter)); if (s.ok()) { (*iter)->start_wait_worker_timer(); + COUNTER_UPDATE(_scanner_sched_counter, 1); olap_scanners.erase(iter++); } else { LOG(FATAL) << "Failed to assign scanner task to thread pool! " @@ -1453,6 +1456,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { task.priority = _nice; task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); (*iter)->start_wait_worker_timer(); + COUNTER_UPDATE(_scanner_sched_counter, 1); if (thread_pool->offer(task)) { olap_scanners.erase(iter++); } else { diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 1220a742e0..4cd2902b93 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -57,7 +57,12 @@ public: Status collect_query_statistics(QueryStatistics* statistics) override; Status close(RuntimeState* state) override; Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; - inline void set_no_agg_finalize() { _need_agg_finalize = false; } + void set_no_agg_finalize() { _need_agg_finalize = false; } + Status get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range, int block_row_count, + bool is_begin_include, bool is_end_include, + const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range, + std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range, + RuntimeProfile* profile); protected: struct HeapType { @@ -246,7 +251,7 @@ protected: RuntimeProfile::Counter* _tablet_counter; RuntimeProfile::Counter* _rows_pushed_cond_filtered_counter = nullptr; RuntimeProfile::Counter* _reader_init_timer = nullptr; - + RuntimeProfile::Counter* _scanner_sched_counter = nullptr; TResourceInfo* _resource_info; std::atomic<int64_t> _buffered_bytes; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 9baef00001..4e2003ae0b 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -308,7 +308,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { if (UNLIKELY(*eof)) { break; } - _num_rows_read++; _convert_row_to_tuple(tuple); diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h index c8fbdab94c..a7671f2724 100644 --- a/be/src/olap/bloom_filter_predicate.h +++ b/be/src/olap/bloom_filter_predicate.h @@ -71,6 +71,9 @@ public: private: std::shared_ptr<IBloomFilterFuncBase> _filter; SpecificFilter* _specific_filter; // owned by _filter + mutable uint64_t _evaluated_rows = 1; + mutable uint64_t _passed_rows = 0; + mutable bool _enable_pred = true; }; // bloom filter column predicate do not support in segment v1 @@ -113,7 +116,9 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16 uint16_t* size) const { uint16_t new_size = 0; using FT = typename PredicatePrimitiveTypeTraits<T>::PredicateFieldType; - + if (!_enable_pred) { + return; + } if (column.is_nullable()) { auto* nullable_col = vectorized::check_and_get_column<vectorized::ColumnNullable>(column); auto& null_map_data = nullable_col->get_null_map_column().get_data(); @@ -158,6 +163,16 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16 new_size += _specific_filter->find_olap_engine(cell_value); } } + // If the pass rate is very high, for example > 50%, then the bloomfilter is useless. + // Some bloomfilter is useless, for example ssb 4.3, it consumes a lot of cpu but it is + // useless. + _evaluated_rows += *size; + _passed_rows += new_size; + if (_evaluated_rows > config::bloom_filter_predicate_check_row_num) { + if (_passed_rows / (_evaluated_rows * 1.0) > 0.5) { + _enable_pred = false; + } + } *size = new_size; } diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 3156324d66..c54312d16d 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -196,9 +196,13 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { int64_t raw_bytes_read = 0; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; bool get_free_block = true; + int num_rows_in_block = 0; - while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold && - get_free_block) { + // Has to wait at least one full block, or it will cause a lot of schedule task in priority + // queue, it will affect query latency and query concurrency for example ssb 3.3. + while (!eos && ((raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold && + get_free_block) || + num_rows_in_block < _runtime_state->batch_size())) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); @@ -218,7 +222,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { } raw_bytes_read += block->allocated_bytes(); - + num_rows_in_block += block->rows(); // 4. if status not ok, change status_. if (UNLIKELY(block->rows() == 0)) { std::lock_guard<std::mutex> l(_free_blocks_lock); @@ -324,6 +328,12 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) { if (cond_ranges.empty()) { cond_ranges.emplace_back(new OlapScanRange()); } + bool need_split = true; + // If we have ranges more than 64, there is no need to call + // ShowHint to split ranges + if (limit() != -1 || cond_ranges.size() > 64) { + need_split = false; + } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); std::unordered_set<std::string> disk_set; @@ -341,6 +351,16 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) { return Status::InternalError(ss.str()); } + std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges; + std::vector<std::unique_ptr<OlapScanRange>> split_ranges; + if (need_split && !tablet->all_beta()) { + auto st = get_hints(tablet, *scan_range, config::doris_scan_range_row_count, + _scan_keys.begin_include(), _scan_keys.end_include(), cond_ranges, + &split_ranges, _runtime_profile.get()); + if (st.ok()) { + ranges = &split_ranges; + } + } int size_based_scanners_per_tablet = 1; if (config::doris_scan_range_max_mb > 0) { @@ -349,17 +369,17 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) { } int ranges_per_scanner = - std::max(1, (int)cond_ranges.size() / + std::max(1, (int)ranges->size() / std::min(scanners_per_tablet, size_based_scanners_per_tablet)); - int num_ranges = cond_ranges.size(); + int num_ranges = ranges->size(); for (int i = 0; i < num_ranges;) { std::vector<OlapScanRange*> scanner_ranges; - scanner_ranges.push_back(cond_ranges[i].get()); + scanner_ranges.push_back((*ranges)[i].get()); ++i; for (int j = 1; i < num_ranges && j < ranges_per_scanner && - cond_ranges[i]->end_include == cond_ranges[i - 1]->end_include; + (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include; ++j, ++i) { - scanner_ranges.push_back(cond_ranges[i].get()); + scanner_ranges.push_back((*ranges)[i].get()); } VOlapScanner* scanner = new VOlapScanner(state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range); @@ -551,8 +571,11 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) { int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) { std::list<VOlapScanner*> olap_scanners; int assigned_thread_num = _running_thread; - size_t max_thread = std::min(_volap_scanners.size(), - static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)); + size_t max_thread = config::doris_scanner_queue_size; + if (config::doris_scanner_row_num > state->batch_size()) { + max_thread /= config::doris_scanner_row_num / state->batch_size(); + if (max_thread <= 0) max_thread = 1; + } // copy to local { // How many thread can apply to this query @@ -606,6 +629,7 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per task.priority = _nice; task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); (*iter)->start_wait_worker_timer(); + COUNTER_UPDATE(_scanner_sched_counter, 1); if (thread_pool->offer(task)) { olap_scanners.erase(iter++); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
