This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eedd15  [optimize] optimze tablet read, avoid to create too much 
scanner for small tablet (#8096)
3eedd15 is described below

commit 3eedd15f9c28ea6c853242714ec787b13f69499a
Author: Zhengguo Yang <[email protected]>
AuthorDate: Tue Mar 8 13:59:45 2022 +0800

    [optimize] optimze tablet read, avoid to create too much scanner for small 
tablet (#8096)
---
 be/src/common/config.h                             |  7 +-
 be/src/exec/olap_scan_node.cpp                     | 54 ++++++++-------
 be/src/exec/olap_scan_node.h                       |  6 --
 be/src/olap/tablet.h                               |  5 +-
 be/src/olap/tablet_meta.h                          | 19 +++++-
 be/src/vec/exec/volap_scan_node.cpp                | 78 ++++++++++++----------
 be/src/vec/exec/volap_scanner.cpp                  |  8 +--
 docs/en/administrator-guide/config/be_config.md    |  4 ++
 docs/zh-CN/administrator-guide/config/be_config.md |  5 ++
 9 files changed, 112 insertions(+), 74 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7b67e4a..3fdb555 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -163,8 +163,10 @@ CONF_Int32(etl_thread_pool_queue_size, "256");
 CONF_mInt32(thrift_connect_timeout_seconds, "3");
 // default thrift client retry interval (in milliseconds)
 CONF_mInt64(thrift_client_retry_interval_ms, "1000");
-// max row count number for single scan range
+// max row count number for single scan range, used in segmentv1
 CONF_mInt32(doris_scan_range_row_count, "524288");
+// max bytes number for single scan range, used in segmentv2
+CONF_mInt32(doris_scan_range_max_mb, "0");
 // size of scanner queue between scanner thread and compute thread
 CONF_mInt32(doris_scanner_queue_size, "1024");
 // single read execute fragment row size
@@ -298,7 +300,8 @@ CONF_mInt32(compaction_task_num_per_disk, "2");
 // compaction thread num for fast disk(typically .SSD), must be greater than 2.
 CONF_mInt32(compaction_task_num_per_fast_disk, "4");
 CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool { 
return config >= 2; });
-CONF_Validator(compaction_task_num_per_fast_disk, [](const int config) -> bool 
{ return config >= 2; });
+CONF_Validator(compaction_task_num_per_fast_disk,
+               [](const int config) -> bool { return config >= 2; });
 
 // How many rounds of cumulative compaction for each round of base compaction 
when compaction tasks generation.
 CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index af26ae1..81d0bfb 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -675,24 +675,11 @@ Status OlapScanNode::build_scan_key() {
     return Status::OK();
 }
 
-Status OlapScanNode::get_hints(const TPaloScanRange& scan_range, int 
block_row_count,
-                               bool is_begin_include, bool is_end_include,
-                               const 
std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
-                               std::vector<std::unique_ptr<OlapScanRange>>* 
sub_scan_range,
-                               RuntimeProfile* profile) {
-    auto tablet_id = scan_range.tablet_id;
-    int32_t schema_hash = strtoul(scan_range.schema_hash.c_str(), nullptr, 10);
-    std::string err;
-    TabletSharedPtr table = 
StorageEngine::instance()->tablet_manager()->get_tablet(
-            tablet_id, schema_hash, true, &err);
-    if (table == nullptr) {
-        std::stringstream ss;
-        ss << "failed to get tablet: " << tablet_id << " with schema hash: " 
<< schema_hash
-           << ", reason: " << err;
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
-    }
-
+static Status get_hints(TabletSharedPtr table, const TPaloScanRange& 
scan_range,
+                        int block_row_count, bool is_begin_include, bool 
is_end_include,
+                        const std::vector<std::unique_ptr<OlapScanRange>>& 
scan_key_range,
+                        std::vector<std::unique_ptr<OlapScanRange>>* 
sub_scan_range,
+                        RuntimeProfile* profile) {
     RuntimeProfile::Counter* show_hints_timer = 
profile->get_counter("ShowHintsTime_V1");
     std::vector<std::vector<OlapTuple>> ranges;
     bool have_valid_range = false;
@@ -772,21 +759,42 @@ Status OlapScanNode::start_scan_thread(RuntimeState* 
state) {
     }
 
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
-
     std::unordered_set<std::string> disk_set;
     for (auto& scan_range : _scan_ranges) {
+        auto tablet_id = scan_range->tablet_id;
+        int32_t schema_hash = strtoul(scan_range->schema_hash.c_str(), 
nullptr, 10);
+        std::string err;
+        TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
+                tablet_id, schema_hash, true, &err);
+        if (tablet == nullptr) {
+            std::stringstream ss;
+            ss << "failed to get tablet: " << tablet_id << " with schema hash: 
" << schema_hash
+               << ", reason: " << err;
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
         std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
         std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
-        if (need_split) {
-            auto st = get_hints(*scan_range, 
config::doris_scan_range_row_count,
+        if (need_split && !tablet->all_beta()) {
+            auto st = get_hints(tablet, *scan_range, 
config::doris_scan_range_row_count,
                                 _scan_keys.begin_include(), 
_scan_keys.end_include(), cond_ranges,
                                 &split_ranges, _runtime_profile.get());
             if (st.ok()) {
                 ranges = &split_ranges;
             }
         }
-
-        int ranges_per_scanner = std::max(1, (int)ranges->size() / 
scanners_per_tablet);
+        // In order to avoid the problem of too many scanners caused by small 
tablets,
+        // in addition to scanRange, we also need to consider the size of the 
tablet when
+        // creating the scanner. One scanner is used for every 1Gb, and the 
final scanner_per_tablet
+        // takes the minimum value calculated by scanrange and size.
+        int size_based_scanners_per_tablet = 1;
+        if (config::doris_scan_range_max_mb > 0) {
+            size_based_scanners_per_tablet = std::max(
+                    1, (int)tablet->tablet_footprint() / 
config::doris_scan_range_max_mb << 20);
+        }
+        int ranges_per_scanner =
+                std::max(1, (int)ranges->size() /
+                                    std::min(scanners_per_tablet, 
size_based_scanners_per_tablet));
         int num_ranges = ranges->size();
         for (int i = 0; i < num_ranges;) {
             std::vector<OlapScanRange*> scanner_ranges;
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 5ebd647..6d8d89b 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -154,12 +154,6 @@ protected:
     std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot, 
Expr* pred,
                                                          int conj_idx, int 
child_idx);
 
-    static Status get_hints(const TPaloScanRange& scan_range, int 
block_row_count,
-                            bool is_begin_include, bool is_end_include,
-                            const std::vector<std::unique_ptr<OlapScanRange>>& 
scan_key_range,
-                            std::vector<std::unique_ptr<OlapScanRange>>* 
sub_scan_range,
-                            RuntimeProfile* profile);
-
     friend class OlapScanner;
     friend class vectorized::VOlapScanner;
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 52f58ae..0429136 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -119,7 +119,8 @@ public:
                                            std::vector<Version>* version_path) 
const;
     OLAPStatus check_version_integrity(const Version& version);
     bool check_version_exist(const Version& version) const;
-    void acquire_version_and_rowsets(std::vector<std::pair<Version, 
RowsetSharedPtr>>* version_rowsets) const;
+    void acquire_version_and_rowsets(
+            std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) 
const;
 
     OLAPStatus capture_consistent_rowsets(const Version& spec_version,
                                           std::vector<RowsetSharedPtr>* 
rowsets) const;
@@ -260,6 +261,8 @@ public:
         return _cumulative_compaction_policy;
     }
 
+    inline bool all_beta() const { return _tablet_meta->all_beta(); }
+
 private:
     OLAPStatus _init_once_action();
     void _print_missed_versions(const std::vector<Version>& missed_versions) 
const;
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 9289ab3..5093875 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -81,7 +81,8 @@ public:
     TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, 
int32_t schema_hash,
                uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t 
next_unique_id,
                const std::unordered_map<uint32_t, uint32_t>& 
col_ordinal_to_unique_id,
-               TabletUid tablet_uid, TTabletType::type tabletType, 
TStorageMedium::type t_storage_medium);
+               TabletUid tablet_uid, TTabletType::type tabletType,
+               TStorageMedium::type t_storage_medium);
     // If need add a filed in TableMeta, filed init copy in copy construct 
function
     TabletMeta(const TabletMeta& tablet_meta);
     TabletMeta(TabletMeta&& tablet_meta) = delete;
@@ -167,6 +168,8 @@ public:
     // used for after tablet cloned to clear stale rowset
     void clear_stale_rowset() { _stale_rs_metas.clear(); }
 
+    inline bool all_beta() const;
+
 private:
     OLAPStatus _save_meta(DataDir* data_dir);
     void _init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, 
ColumnPB* column);
@@ -302,6 +305,20 @@ inline const std::vector<RowsetMetaSharedPtr>& 
TabletMeta::all_stale_rs_metas()
     return _stale_rs_metas;
 }
 
+inline bool TabletMeta::all_beta() const {
+    for (auto& rs : _rs_metas) {
+        if (rs->rowset_type() != RowsetTypePB::BETA_ROWSET) {
+            return false;
+        }
+    }
+    for (auto& rs : _stale_rs_metas) {
+        if (rs->rowset_type() != RowsetTypePB::BETA_ROWSET) {
+            return false;
+        }
+    }
+    return true;
+}
+
 // Only for unit test now.
 bool operator==(const TabletMeta& a, const TabletMeta& b);
 bool operator!=(const TabletMeta& a, const TabletMeta& b);
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 481a630..953e5b7 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -62,22 +62,24 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
     _total_assign_num = 0;
     _nice = 18 + std::max(0, 2 - (int)_volap_scanners.size() / 5);
 
-    auto doris_scanner_row_num = _limit == -1 ? config::doris_scanner_row_num :
-            std::min(static_cast<int64_t>(config::doris_scanner_row_num), 
_limit);
-    auto block_size = _limit == -1 ? state->batch_size() :
-            std::min(static_cast<int64_t>(state->batch_size()), _limit);
+    auto doris_scanner_row_num =
+            _limit == -1 ? config::doris_scanner_row_num
+                         : 
std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit);
+    auto block_size = _limit == -1 ? state->batch_size()
+                                   : 
std::min(static_cast<int64_t>(state->batch_size()), _limit);
     auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) / 
block_size;
     auto pre_block_count =
-            std::min(_volap_scanners.size(), 
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) * 
block_per_scanner;
+            std::min(_volap_scanners.size(),
+                     
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
+            block_per_scanner;
 
     for (int i = 0; i < pre_block_count; ++i) {
         auto block = new Block;
         for (const auto slot_desc : _tuple_desc->slots()) {
             auto column_ptr = slot_desc->get_empty_mutable_column();
             column_ptr->reserve(block_size);
-            block->insert(ColumnWithTypeAndName(std::move(column_ptr),
-                                                    
slot_desc->get_data_type_ptr(),
-                                                    slot_desc->col_name()));
+            block->insert(ColumnWithTypeAndName(
+                    std::move(column_ptr), slot_desc->get_data_type_ptr(), 
slot_desc->col_name()));
         }
         _free_blocks.emplace_back(block);
         _buffered_bytes += block->allocated_bytes();
@@ -247,7 +249,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
             _scan_blocks.insert(_scan_blocks.end(), blocks.begin(), 
blocks.end());
         }
         // If eos is true, we will process out of this lock block.
-        if (eos) { scanner->mark_to_need_to_close(); }
+        if (eos) {
+            scanner->mark_to_need_to_close();
+        }
         std::lock_guard<std::mutex> l(_volap_scanners_lock);
         _volap_scanners.push_front(scanner);
     }
@@ -301,43 +305,45 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* 
state) {
     if (cond_ranges.empty()) {
         cond_ranges.emplace_back(new OlapScanRange());
     }
-
-    bool need_split = true;
-    // If we have ranges more than 64, there is no need to call
-    // ShowHint to split ranges
-    if (limit() != -1 || cond_ranges.size() > 64) {
-        need_split = false;
-    }
-
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
 
     std::unordered_set<std::string> disk_set;
     for (auto& scan_range : _scan_ranges) {
-        std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
-        std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
-        if (need_split) {
-            auto st = OlapScanNode::get_hints(*scan_range, 
config::doris_scan_range_row_count,
-                                              _scan_keys.begin_include(), 
_scan_keys.end_include(),
-                                              cond_ranges, &split_ranges, 
_runtime_profile.get());
-            if (st.ok()) {
-                ranges = &split_ranges;
-            }
+        auto tablet_id = scan_range->tablet_id;
+        int32_t schema_hash = strtoul(scan_range->schema_hash.c_str(), 
nullptr, 10);
+        std::string err;
+        TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
+                tablet_id, schema_hash, true, &err);
+        if (tablet == nullptr) {
+            std::stringstream ss;
+            ss << "failed to get tablet: " << tablet_id << " with schema hash: 
" << schema_hash
+               << ", reason: " << err;
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
         }
 
-        int ranges_per_scanner = std::max(1, (int)ranges->size() / 
scanners_per_tablet);
-        int num_ranges = ranges->size();
+        int size_based_scanners_per_tablet = 1;
+
+        if (config::doris_scan_range_max_mb > 0) {
+            size_based_scanners_per_tablet = std::max(
+                    1, (int)tablet->tablet_footprint() / 
config::doris_scan_range_max_mb << 20);
+        }
+
+        int ranges_per_scanner =
+                std::max(1, (int)cond_ranges.size() /
+                                    std::min(scanners_per_tablet, 
size_based_scanners_per_tablet));
+        int num_ranges = cond_ranges.size();
         for (int i = 0; i < num_ranges;) {
             std::vector<OlapScanRange*> scanner_ranges;
-            scanner_ranges.push_back((*ranges)[i].get());
+            scanner_ranges.push_back(cond_ranges[i].get());
             ++i;
             for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
-                            (*ranges)[i]->end_include == (*ranges)[i - 
1]->end_include;
+                            cond_ranges[i]->end_include == cond_ranges[i - 
1]->end_include;
                  ++j, ++i) {
-                scanner_ranges.push_back((*ranges)[i].get());
+                scanner_ranges.push_back(cond_ranges[i].get());
             }
-            VOlapScanner* scanner =
-                    new VOlapScanner(state, this, 
_olap_scan_node.is_preaggregation,
-                                     _need_agg_finalize, *scan_range);
+            VOlapScanner* scanner = new VOlapScanner(state, this, 
_olap_scan_node.is_preaggregation,
+                                                     _need_agg_finalize, 
*scan_range);
             // add scanner to pool before doing prepare.
             // so that scanner can be automatically deconstructed if prepare 
failed.
             _scanner_pool.add(scanner);
@@ -383,7 +389,8 @@ Status VOlapScanNode::close(RuntimeState* state) {
     // clear some block in queue
     // TODO: The presence of transfer_thread here may cause Block's memory 
alloc and be released not in a thread,
     // which may lead to potential performance problems. we should rethink 
whether to delete the transfer thread
-    std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(), 
std::default_delete<Block>());
+    std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(),
+                  std::default_delete<Block>());
     std::for_each(_scan_blocks.begin(), _scan_blocks.end(), 
std::default_delete<Block>());
     std::for_each(_free_blocks.begin(), _free_blocks.end(), 
std::default_delete<Block>());
     _mem_tracker->Release(_buffered_bytes);
@@ -566,5 +573,4 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* 
state, int block_per
     return assigned_thread_num;
 }
 
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/volap_scanner.cpp 
b/be/src/vec/exec/volap_scanner.cpp
index d99184a..fbdbd11 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -20,7 +20,6 @@
 #include <memory>
 
 #include "runtime/runtime_state.h"
-
 #include "vec/columns/column_complex.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_string.h"
@@ -35,8 +34,7 @@ namespace doris::vectorized {
 
 VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, 
bool aggregation,
                            bool need_agg_finalize, const TPaloScanRange& 
scan_range)
-        : OlapScanner(runtime_state, parent, aggregation, need_agg_finalize, 
scan_range) {
-}
+        : OlapScanner(runtime_state, parent, aggregation, need_agg_finalize, 
scan_range) {}
 
 Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, 
bool* eof) {
     // only empty block should be here
@@ -46,8 +44,8 @@ Status VOlapScanner::get_block(RuntimeState* state, 
vectorized::Block* block, bo
     if (!block->mem_reuse()) {
         for (const auto slot_desc : _tuple_desc->slots()) {
             
block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
-                                                    
slot_desc->get_data_type_ptr(),
-                                                    slot_desc->col_name()));
+                                                slot_desc->get_data_type_ptr(),
+                                                slot_desc->col_name()));
         }
     }
 
diff --git a/docs/en/administrator-guide/config/be_config.md 
b/docs/en/administrator-guide/config/be_config.md
index 38b68b4..c917492 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -1495,3 +1495,7 @@ The default value is currently only an empirical value, 
and may need to be modif
 
 Translated with www.DeepL.com/Translator (free version)
 
+### `doris_scan_range_max_mb`
+* Type: int32
+* Description: The maximum amount of data read by each OlapScanner.
+* Default: 1024
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md 
b/docs/zh-CN/administrator-guide/config/be_config.md
index 8e51bc3..65277f2 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -1511,3 +1511,8 @@ webserver默认工作线程数
 * 类型:int32
 * 描述:load 作业中各个rpc 的最小超时时间。
 * 默认:20
+
+### `doris_scan_range_max_mb`
+* 类型: int32
+* 描述: 每个OlapScanner 读取的最大数据量
+* 默认值: 1024

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to