This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fd5dd9a391 [Opt](Pipeline) opt pipeline code in mult tablet (#17999)
fd5dd9a391 is described below
commit fd5dd9a3910fa63a0b1ca2d2e120c2a7a7dfbbc0
Author: HappenLee <[email protected]>
AuthorDate: Mon Mar 27 10:02:48 2023 +0800
[Opt](Pipeline) opt pipeline code in mult tablet (#17999)
---
be/src/vec/exec/scan/new_olap_scan_node.cpp | 24 +++++----------
be/src/vec/exec/scan/new_olap_scan_node.h | 1 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 46 +++++++++++++++--------------
be/src/vec/exec/scan/new_olap_scanner.h | 15 +++++-----
be/src/vec/exec/scan/scanner_scheduler.cpp | 9 +++++-
be/src/vec/exec/scan/vscan_node.cpp | 1 -
be/src/vec/exec/scan/vscanner.h | 8 ++++-
7 files changed, 54 insertions(+), 50 deletions(-)
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index c4ba3d611c..d884845ab1 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -55,7 +55,6 @@ Status NewOlapScanNode::prepare(RuntimeState* state) {
Status NewOlapScanNode::_init_profile() {
RETURN_IF_ERROR(VScanNode::_init_profile());
- _num_disks_accessed_counter = ADD_COUNTER(_runtime_profile,
"NumDiskAccess", TUnit::UNIT);
_tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT);
// 1. init segment profile
@@ -405,11 +404,10 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
}
// ranges constructed from scan keys
- std::vector<std::unique_ptr<doris::OlapScanRange>> cond_ranges;
- RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
+ RETURN_IF_ERROR(_scan_keys.get_key_range(&_cond_ranges));
// if we can't get ranges from conditions, we give it a total range
- if (cond_ranges.empty()) {
- cond_ranges.emplace_back(new doris::OlapScanRange());
+ if (_cond_ranges.empty()) {
+ _cond_ranges.emplace_back(new doris::OlapScanRange());
}
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
@@ -462,25 +460,20 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
}
}
- std::unordered_set<std::string> disk_set;
auto build_new_scanner = [&](const TPaloScanRange& scan_range,
const std::vector<OlapScanRange*>& key_ranges,
const std::vector<RowsetReaderSharedPtr>&
rs_readers,
const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets) {
NewOlapScanner* scanner = new NewOlapScanner(_state, this,
_limit_per_scanner,
-
_olap_scan_node.is_preaggregation,
+
_olap_scan_node.is_preaggregation, scan_range,
+ key_ranges, rs_readers,
rs_reader_seg_offsets,
_need_agg_finalize,
_scanner_profile.get());
scanner->set_compound_filters(_compound_filters);
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare
failed.
_scanner_pool.add(scanner);
- RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges,
_vconjunct_ctx_ptr.get(),
- _olap_filters, _filter_predicates,
_push_down_functions,
- _common_vexpr_ctxs_pushdown.get(),
rs_readers,
- rs_reader_seg_offsets));
scanners->push_back((VScanner*)scanner);
- disk_set.insert(scanner->scan_disk());
return Status::OK();
};
if (is_duplicate_key) {
@@ -489,7 +482,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
std::max(segment_count /
config::doris_scanner_thread_pool_thread_num, 1);
for (int i = 0; i < _scan_ranges.size(); ++i) {
auto& scan_range = _scan_ranges[i];
- std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&cond_ranges;
+ std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&_cond_ranges;
int num_ranges = ranges->size();
std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges);
for (int j = 0; j < num_ranges; ++j) {
@@ -553,7 +546,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
true);
RETURN_IF_ERROR(status);
- std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&cond_ranges;
+ std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&_cond_ranges;
int size_based_scanners_per_tablet = 1;
if (config::doris_scan_range_max_mb > 0) {
@@ -577,10 +570,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges,
{}, {}));
}
}
- COUNTER_SET(_num_disks_accessed_counter,
static_cast<int64_t>(disk_set.size()));
}
- // telemetry::set_span_attribute(span, _num_disks_accessed_counter);
- // telemetry::set_span_attribute(span, _num_scanners);
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h
b/be/src/vec/exec/scan/new_olap_scan_node.h
index b1c95804b7..48fde8436b 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -65,6 +65,7 @@ private:
private:
TOlapScanNode _olap_scan_node;
std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
+ std::vector<std::unique_ptr<doris::OlapScanRange>> _cond_ranges;
OlapScanKeys _scan_keys;
std::vector<TCondition> _olap_filters;
// _compound_filters store conditions in the one compound relationship in
conjunct expr tree except leaf node of `and` node,
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 3aaaf6add4..59b72eea8b 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -24,12 +24,21 @@
namespace doris::vectorized {
NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent,
int64_t limit,
- bool aggregation, bool need_agg_finalize,
RuntimeProfile* profile)
+ bool aggregation, const TPaloScanRange&
scan_range,
+ const std::vector<OlapScanRange*>& key_ranges,
+ const std::vector<RowsetReaderSharedPtr>&
rs_readers,
+ const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets,
+ bool need_agg_finalize, RuntimeProfile* profile)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
_aggregation(aggregation),
_need_agg_finalize(need_agg_finalize),
- _version(-1) {
+ _version(-1),
+ _scan_range(scan_range),
+ _key_ranges(key_ranges) {
+ _tablet_reader_params.rs_readers = rs_readers;
+ _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets;
_tablet_schema = std::make_shared<TabletSchema>();
+ _is_init = false;
}
static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
@@ -46,19 +55,14 @@ static std::string read_columns_to_string(TabletSchemaSPtr
tablet_schema,
return read_columns_string;
}
-Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
- const std::vector<OlapScanRange*>& key_ranges,
- VExprContext** vconjunct_ctx_ptr,
- const std::vector<TCondition>& filters,
- const FilterPredicates& filter_predicates,
- const std::vector<FunctionFilter>&
function_filters,
- VExprContext** common_vexpr_ctxs_pushdown,
- const std::vector<RowsetReaderSharedPtr>&
rs_readers,
- const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets) {
- RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
- if (common_vexpr_ctxs_pushdown != nullptr) {
+Status NewOlapScanner::init() {
+ _is_init = true;
+ auto parent = static_cast<NewOlapScanNode*>(_parent);
+ RETURN_IF_ERROR(VScanner::prepare(_state,
parent->_vconjunct_ctx_ptr.get()));
+ if (parent->_common_vexpr_ctxs_pushdown != nullptr) {
// Copy common_vexpr_ctxs_pushdown from scan node to this scanner's
_common_vexpr_ctxs_pushdown, just necessary.
- RETURN_IF_ERROR((*common_vexpr_ctxs_pushdown)->clone(_state,
&_common_vexpr_ctxs_pushdown));
+ RETURN_IF_ERROR((*parent->_common_vexpr_ctxs_pushdown)
+ ->clone(_state, &_common_vexpr_ctxs_pushdown));
}
// set limit to reduce end of rowset and segment mem use
@@ -69,8 +73,8 @@ Status NewOlapScanner::prepare(const TPaloScanRange&
scan_range,
: std::min(static_cast<int64_t>(_state->batch_size()),
_parent->limit()));
// Get olap table
- TTabletId tablet_id = scan_range.tablet_id;
- _version = strtoul(scan_range.version.c_str(), nullptr, 10);
+ TTabletId tablet_id = _scan_range.tablet_id;
+ _version = strtoul(_scan_range.version.c_str(), nullptr, 10);
{
auto [tablet, status] =
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
true);
@@ -114,7 +118,7 @@ Status NewOlapScanner::prepare(const TPaloScanRange&
scan_range,
{
std::shared_lock rdlock(_tablet->get_header_lock());
- if (rs_readers.empty()) {
+ if (_tablet_reader_params.rs_readers.empty()) {
const RowsetSharedPtr rowset =
_tablet->rowset_with_max_version();
if (rowset == nullptr) {
std::stringstream ss;
@@ -137,14 +141,12 @@ Status NewOlapScanner::prepare(const TPaloScanRange&
scan_range,
<< ", backend=" << BackendOptions::get_localhost();
return Status::InternalError(ss.str());
}
- } else {
- _tablet_reader_params.rs_readers = rs_readers;
- _tablet_reader_params.rs_readers_segment_offsets =
rs_reader_seg_offsets;
}
// Initialize tablet_reader_params
- RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters,
filter_predicates,
- function_filters));
+ RETURN_IF_ERROR(_init_tablet_reader_params(_key_ranges,
parent->_olap_filters,
+
parent->_filter_predicates,
+
parent->_push_down_functions));
}
}
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index 2a04e021db..4b05880bcd 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -35,20 +35,17 @@ struct FilterPredicates;
class NewOlapScanner : public VScanner {
public:
NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t
limit, bool aggregation,
+ const TPaloScanRange& scan_range, const
std::vector<OlapScanRange*>& key_ranges,
+ const std::vector<RowsetReaderSharedPtr>& rs_readers,
+ const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets,
bool need_agg_finalize, RuntimeProfile* profile);
+ Status init() override;
+
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
- Status prepare(const TPaloScanRange& scan_range, const
std::vector<OlapScanRange*>& key_ranges,
- VExprContext** vconjunct_ctx_ptr, const
std::vector<TCondition>& filters,
- const FilterPredicates& filter_predicates,
- const std::vector<FunctionFilter>& function_filters,
- VExprContext** common_vexpr_ctxs_pushdown,
- const std::vector<RowsetReaderSharedPtr>& rs_readers = {},
- const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets = {});
-
const std::string& scan_disk() const { return _tablet->data_dir()->path();
}
void set_compound_filters(const std::vector<TCondition>& compound_filters);
@@ -75,6 +72,8 @@ private:
TabletSchemaSPtr _tablet_schema;
TabletSharedPtr _tablet;
int64_t _version;
+ const TPaloScanRange& _scan_range;
+ std::vector<OlapScanRange*> _key_ranges;
TabletReader::ReaderParams _tablet_reader_params;
std::unique_ptr<TabletReader> _tablet_reader;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 38121538f7..a6a76e57d7 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -260,7 +260,14 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
bool eos = false;
RuntimeState* state = ctx->state();
DCHECK(nullptr != state);
- if (!scanner->is_open()) {
+ if (!scanner->is_init()) {
+ status = scanner->init();
+ if (!status.ok()) {
+ ctx->set_status_on_error(status);
+ eos = true;
+ }
+ }
+ if (!eos && !scanner->is_open()) {
status = scanner->open(state);
if (!status.ok()) {
ctx->set_status_on_error(status);
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 3e9459071b..1286baa573 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -1363,7 +1363,6 @@ Status VScanNode::_prepare_scanners() {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners));
}
-
return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 88cac3db42..26248d9b54 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -39,7 +39,9 @@ class VScanner {
public:
VScanner(RuntimeState* state, VScanNode* parent, int64_t limit,
RuntimeProfile* profile);
- virtual ~VScanner() {}
+ virtual ~VScanner() = default;
+
+ virtual Status init() { return Status::OK(); }
virtual Status open(RuntimeState* state) { return Status::OK(); }
@@ -67,6 +69,8 @@ public:
int64_t get_rows_read() const { return _num_rows_read; }
+ bool is_init() const { return _is_init; }
+
Status try_append_late_arrival_runtime_filter();
// Call start_wait_worker_timer() when submit the scanner to the thread
pool.
@@ -179,6 +183,8 @@ protected:
// set to true after decrease the "_num_unfinished_scanners" in scanner
context
bool _is_counted_down = false;
+ bool _is_init = true;
+
ScannerCounter _counter;
int64_t _per_scanner_timer = 0;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]