This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3eedd15 [optimize] optimze tablet read, avoid to create too much
scanner for small tablet (#8096)
3eedd15 is described below
commit 3eedd15f9c28ea6c853242714ec787b13f69499a
Author: Zhengguo Yang <[email protected]>
AuthorDate: Tue Mar 8 13:59:45 2022 +0800
[optimize] optimze tablet read, avoid to create too much scanner for small
tablet (#8096)
---
be/src/common/config.h | 7 +-
be/src/exec/olap_scan_node.cpp | 54 ++++++++-------
be/src/exec/olap_scan_node.h | 6 --
be/src/olap/tablet.h | 5 +-
be/src/olap/tablet_meta.h | 19 +++++-
be/src/vec/exec/volap_scan_node.cpp | 78 ++++++++++++----------
be/src/vec/exec/volap_scanner.cpp | 8 +--
docs/en/administrator-guide/config/be_config.md | 4 ++
docs/zh-CN/administrator-guide/config/be_config.md | 5 ++
9 files changed, 112 insertions(+), 74 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7b67e4a..3fdb555 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -163,8 +163,10 @@ CONF_Int32(etl_thread_pool_queue_size, "256");
CONF_mInt32(thrift_connect_timeout_seconds, "3");
// default thrift client retry interval (in milliseconds)
CONF_mInt64(thrift_client_retry_interval_ms, "1000");
-// max row count number for single scan range
+// max row count number for single scan range, used in segmentv1
CONF_mInt32(doris_scan_range_row_count, "524288");
+// max bytes number for single scan range, used in segmentv2
+CONF_mInt32(doris_scan_range_max_mb, "0");
// size of scanner queue between scanner thread and compute thread
CONF_mInt32(doris_scanner_queue_size, "1024");
// single read execute fragment row size
@@ -298,7 +300,8 @@ CONF_mInt32(compaction_task_num_per_disk, "2");
// compaction thread num for fast disk(typically .SSD), must be greater than 2.
CONF_mInt32(compaction_task_num_per_fast_disk, "4");
CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool {
return config >= 2; });
-CONF_Validator(compaction_task_num_per_fast_disk, [](const int config) -> bool
{ return config >= 2; });
+CONF_Validator(compaction_task_num_per_fast_disk,
+ [](const int config) -> bool { return config >= 2; });
// How many rounds of cumulative compaction for each round of base compaction
when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index af26ae1..81d0bfb 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -675,24 +675,11 @@ Status OlapScanNode::build_scan_key() {
return Status::OK();
}
-Status OlapScanNode::get_hints(const TPaloScanRange& scan_range, int
block_row_count,
- bool is_begin_include, bool is_end_include,
- const
std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
- std::vector<std::unique_ptr<OlapScanRange>>*
sub_scan_range,
- RuntimeProfile* profile) {
- auto tablet_id = scan_range.tablet_id;
- int32_t schema_hash = strtoul(scan_range.schema_hash.c_str(), nullptr, 10);
- std::string err;
- TabletSharedPtr table =
StorageEngine::instance()->tablet_manager()->get_tablet(
- tablet_id, schema_hash, true, &err);
- if (table == nullptr) {
- std::stringstream ss;
- ss << "failed to get tablet: " << tablet_id << " with schema hash: "
<< schema_hash
- << ", reason: " << err;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
-
+static Status get_hints(TabletSharedPtr table, const TPaloScanRange&
scan_range,
+ int block_row_count, bool is_begin_include, bool
is_end_include,
+ const std::vector<std::unique_ptr<OlapScanRange>>&
scan_key_range,
+ std::vector<std::unique_ptr<OlapScanRange>>*
sub_scan_range,
+ RuntimeProfile* profile) {
RuntimeProfile::Counter* show_hints_timer =
profile->get_counter("ShowHintsTime_V1");
std::vector<std::vector<OlapTuple>> ranges;
bool have_valid_range = false;
@@ -772,21 +759,42 @@ Status OlapScanNode::start_scan_thread(RuntimeState*
state) {
}
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
-
std::unordered_set<std::string> disk_set;
for (auto& scan_range : _scan_ranges) {
+ auto tablet_id = scan_range->tablet_id;
+ int32_t schema_hash = strtoul(scan_range->schema_hash.c_str(),
nullptr, 10);
+ std::string err;
+ TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
+ tablet_id, schema_hash, true, &err);
+ if (tablet == nullptr) {
+ std::stringstream ss;
+ ss << "failed to get tablet: " << tablet_id << " with schema hash:
" << schema_hash
+ << ", reason: " << err;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
- if (need_split) {
- auto st = get_hints(*scan_range,
config::doris_scan_range_row_count,
+ if (need_split && !tablet->all_beta()) {
+ auto st = get_hints(tablet, *scan_range,
config::doris_scan_range_row_count,
_scan_keys.begin_include(),
_scan_keys.end_include(), cond_ranges,
&split_ranges, _runtime_profile.get());
if (st.ok()) {
ranges = &split_ranges;
}
}
-
- int ranges_per_scanner = std::max(1, (int)ranges->size() /
scanners_per_tablet);
+ // In order to avoid the problem of too many scanners caused by small
tablets,
+ // in addition to scanRange, we also need to consider the size of the
tablet when
+ // creating the scanner. One scanner is used for every 1Gb, and the
final scanner_per_tablet
+ // takes the minimum value calculated by scanrange and size.
+ int size_based_scanners_per_tablet = 1;
+ if (config::doris_scan_range_max_mb > 0) {
+ size_based_scanners_per_tablet = std::max(
+ 1, (int)tablet->tablet_footprint() /
config::doris_scan_range_max_mb << 20);
+ }
+ int ranges_per_scanner =
+ std::max(1, (int)ranges->size() /
+ std::min(scanners_per_tablet,
size_based_scanners_per_tablet));
int num_ranges = ranges->size();
for (int i = 0; i < num_ranges;) {
std::vector<OlapScanRange*> scanner_ranges;
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 5ebd647..6d8d89b 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -154,12 +154,6 @@ protected:
std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot,
Expr* pred,
int conj_idx, int
child_idx);
- static Status get_hints(const TPaloScanRange& scan_range, int
block_row_count,
- bool is_begin_include, bool is_end_include,
- const std::vector<std::unique_ptr<OlapScanRange>>&
scan_key_range,
- std::vector<std::unique_ptr<OlapScanRange>>*
sub_scan_range,
- RuntimeProfile* profile);
-
friend class OlapScanner;
friend class vectorized::VOlapScanner;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 52f58ae..0429136 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -119,7 +119,8 @@ public:
std::vector<Version>* version_path)
const;
OLAPStatus check_version_integrity(const Version& version);
bool check_version_exist(const Version& version) const;
- void acquire_version_and_rowsets(std::vector<std::pair<Version,
RowsetSharedPtr>>* version_rowsets) const;
+ void acquire_version_and_rowsets(
+ std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets)
const;
OLAPStatus capture_consistent_rowsets(const Version& spec_version,
std::vector<RowsetSharedPtr>*
rowsets) const;
@@ -260,6 +261,8 @@ public:
return _cumulative_compaction_policy;
}
+ inline bool all_beta() const { return _tablet_meta->all_beta(); }
+
private:
OLAPStatus _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions)
const;
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 9289ab3..5093875 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -81,7 +81,8 @@ public:
TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id,
int32_t schema_hash,
uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t
next_unique_id,
const std::unordered_map<uint32_t, uint32_t>&
col_ordinal_to_unique_id,
- TabletUid tablet_uid, TTabletType::type tabletType,
TStorageMedium::type t_storage_medium);
+ TabletUid tablet_uid, TTabletType::type tabletType,
+ TStorageMedium::type t_storage_medium);
// If need add a filed in TableMeta, filed init copy in copy construct
function
TabletMeta(const TabletMeta& tablet_meta);
TabletMeta(TabletMeta&& tablet_meta) = delete;
@@ -167,6 +168,8 @@ public:
// used for after tablet cloned to clear stale rowset
void clear_stale_rowset() { _stale_rs_metas.clear(); }
+ inline bool all_beta() const;
+
private:
OLAPStatus _save_meta(DataDir* data_dir);
void _init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn,
ColumnPB* column);
@@ -302,6 +305,20 @@ inline const std::vector<RowsetMetaSharedPtr>&
TabletMeta::all_stale_rs_metas()
return _stale_rs_metas;
}
+inline bool TabletMeta::all_beta() const {
+ for (auto& rs : _rs_metas) {
+ if (rs->rowset_type() != RowsetTypePB::BETA_ROWSET) {
+ return false;
+ }
+ }
+ for (auto& rs : _stale_rs_metas) {
+ if (rs->rowset_type() != RowsetTypePB::BETA_ROWSET) {
+ return false;
+ }
+ }
+ return true;
+}
+
// Only for unit test now.
bool operator==(const TabletMeta& a, const TabletMeta& b);
bool operator!=(const TabletMeta& a, const TabletMeta& b);
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index 481a630..953e5b7 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -62,22 +62,24 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
_total_assign_num = 0;
_nice = 18 + std::max(0, 2 - (int)_volap_scanners.size() / 5);
- auto doris_scanner_row_num = _limit == -1 ? config::doris_scanner_row_num :
- std::min(static_cast<int64_t>(config::doris_scanner_row_num),
_limit);
- auto block_size = _limit == -1 ? state->batch_size() :
- std::min(static_cast<int64_t>(state->batch_size()), _limit);
+ auto doris_scanner_row_num =
+ _limit == -1 ? config::doris_scanner_row_num
+ :
std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit);
+ auto block_size = _limit == -1 ? state->batch_size()
+ :
std::min(static_cast<int64_t>(state->batch_size()), _limit);
auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) /
block_size;
auto pre_block_count =
- std::min(_volap_scanners.size(),
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
block_per_scanner;
+ std::min(_volap_scanners.size(),
+
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
+ block_per_scanner;
for (int i = 0; i < pre_block_count; ++i) {
auto block = new Block;
for (const auto slot_desc : _tuple_desc->slots()) {
auto column_ptr = slot_desc->get_empty_mutable_column();
column_ptr->reserve(block_size);
- block->insert(ColumnWithTypeAndName(std::move(column_ptr),
-
slot_desc->get_data_type_ptr(),
- slot_desc->col_name()));
+ block->insert(ColumnWithTypeAndName(
+ std::move(column_ptr), slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
_free_blocks.emplace_back(block);
_buffered_bytes += block->allocated_bytes();
@@ -247,7 +249,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
_scan_blocks.insert(_scan_blocks.end(), blocks.begin(),
blocks.end());
}
// If eos is true, we will process out of this lock block.
- if (eos) { scanner->mark_to_need_to_close(); }
+ if (eos) {
+ scanner->mark_to_need_to_close();
+ }
std::lock_guard<std::mutex> l(_volap_scanners_lock);
_volap_scanners.push_front(scanner);
}
@@ -301,43 +305,45 @@ Status VOlapScanNode::start_scan_thread(RuntimeState*
state) {
if (cond_ranges.empty()) {
cond_ranges.emplace_back(new OlapScanRange());
}
-
- bool need_split = true;
- // If we have ranges more than 64, there is no need to call
- // ShowHint to split ranges
- if (limit() != -1 || cond_ranges.size() > 64) {
- need_split = false;
- }
-
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
std::unordered_set<std::string> disk_set;
for (auto& scan_range : _scan_ranges) {
- std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
- std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
- if (need_split) {
- auto st = OlapScanNode::get_hints(*scan_range,
config::doris_scan_range_row_count,
- _scan_keys.begin_include(),
_scan_keys.end_include(),
- cond_ranges, &split_ranges,
_runtime_profile.get());
- if (st.ok()) {
- ranges = &split_ranges;
- }
+ auto tablet_id = scan_range->tablet_id;
+ int32_t schema_hash = strtoul(scan_range->schema_hash.c_str(),
nullptr, 10);
+ std::string err;
+ TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
+ tablet_id, schema_hash, true, &err);
+ if (tablet == nullptr) {
+ std::stringstream ss;
+ ss << "failed to get tablet: " << tablet_id << " with schema hash:
" << schema_hash
+ << ", reason: " << err;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
}
- int ranges_per_scanner = std::max(1, (int)ranges->size() /
scanners_per_tablet);
- int num_ranges = ranges->size();
+ int size_based_scanners_per_tablet = 1;
+
+ if (config::doris_scan_range_max_mb > 0) {
+ size_based_scanners_per_tablet = std::max(
+ 1, (int)tablet->tablet_footprint() /
config::doris_scan_range_max_mb << 20);
+ }
+
+ int ranges_per_scanner =
+ std::max(1, (int)cond_ranges.size() /
+ std::min(scanners_per_tablet,
size_based_scanners_per_tablet));
+ int num_ranges = cond_ranges.size();
for (int i = 0; i < num_ranges;) {
std::vector<OlapScanRange*> scanner_ranges;
- scanner_ranges.push_back((*ranges)[i].get());
+ scanner_ranges.push_back(cond_ranges[i].get());
++i;
for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
- (*ranges)[i]->end_include == (*ranges)[i -
1]->end_include;
+ cond_ranges[i]->end_include == cond_ranges[i -
1]->end_include;
++j, ++i) {
- scanner_ranges.push_back((*ranges)[i].get());
+ scanner_ranges.push_back(cond_ranges[i].get());
}
- VOlapScanner* scanner =
- new VOlapScanner(state, this,
_olap_scan_node.is_preaggregation,
- _need_agg_finalize, *scan_range);
+ VOlapScanner* scanner = new VOlapScanner(state, this,
_olap_scan_node.is_preaggregation,
+ _need_agg_finalize,
*scan_range);
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare
failed.
_scanner_pool.add(scanner);
@@ -383,7 +389,8 @@ Status VOlapScanNode::close(RuntimeState* state) {
// clear some block in queue
// TODO: The presence of transfer_thread here may cause Block's memory
alloc and be released not in a thread,
// which may lead to potential performance problems. we should rethink
whether to delete the transfer thread
- std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(),
std::default_delete<Block>());
+ std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(),
+ std::default_delete<Block>());
std::for_each(_scan_blocks.begin(), _scan_blocks.end(),
std::default_delete<Block>());
std::for_each(_free_blocks.begin(), _free_blocks.end(),
std::default_delete<Block>());
_mem_tracker->Release(_buffered_bytes);
@@ -566,5 +573,4 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState*
state, int block_per
return assigned_thread_num;
}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/volap_scanner.cpp
b/be/src/vec/exec/volap_scanner.cpp
index d99184a..fbdbd11 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -20,7 +20,6 @@
#include <memory>
#include "runtime/runtime_state.h"
-
#include "vec/columns/column_complex.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
@@ -35,8 +34,7 @@ namespace doris::vectorized {
VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent,
bool aggregation,
bool need_agg_finalize, const TPaloScanRange&
scan_range)
- : OlapScanner(runtime_state, parent, aggregation, need_agg_finalize,
scan_range) {
-}
+ : OlapScanner(runtime_state, parent, aggregation, need_agg_finalize,
scan_range) {}
Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block,
bool* eof) {
// only empty block should be here
@@ -46,8 +44,8 @@ Status VOlapScanner::get_block(RuntimeState* state,
vectorized::Block* block, bo
if (!block->mem_reuse()) {
for (const auto slot_desc : _tuple_desc->slots()) {
block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
-
slot_desc->get_data_type_ptr(),
- slot_desc->col_name()));
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
}
}
diff --git a/docs/en/administrator-guide/config/be_config.md
b/docs/en/administrator-guide/config/be_config.md
index 38b68b4..c917492 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -1495,3 +1495,7 @@ The default value is currently only an empirical value,
and may need to be modif
Translated with www.DeepL.com/Translator (free version)
+### `doris_scan_range_max_mb`
+* Type: int32
+* Description: The maximum amount of data read by each OlapScanner.
+* Default: 1024
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md
b/docs/zh-CN/administrator-guide/config/be_config.md
index 8e51bc3..65277f2 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -1511,3 +1511,8 @@ webserver默认工作线程数
* 类型:int32
* 描述:load 作业中各个rpc 的最小超时时间。
* 默认:20
+
+### `doris_scan_range_max_mb`
+* 类型: int32
+* 描述: 每个OlapScanner 读取的最大数据量
+* 默认值: 1024
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]