This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 39b5682d59 [Pipeline](shared_scan_opt) Support shared scan opt in
pipeline exec engine
39b5682d59 is described below
commit 39b5682d59e798fa7fb60315dbf823aa34888891
Author: HappenLee <[email protected]>
AuthorDate: Mon Mar 13 10:33:57 2023 +0800
[Pipeline](shared_scan_opt) Support shared scan opt in pipeline exec engine
---
be/src/common/status.h | 4 +
be/src/exprs/runtime_filter.cpp | 50 ----------
be/src/pipeline/exec/scan_operator.cpp | 20 ++--
be/src/pipeline/pipeline_fragment_context.cpp | 3 +
be/src/pipeline/pipeline_task.cpp | 3 +
be/src/runtime/query_fragments_ctx.h | 7 ++
be/src/runtime/runtime_state.h | 6 ++
be/src/vec/CMakeLists.txt | 3 +-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 7 +-
be/src/vec/exec/scan/pip_scanner_context.h | 62 ++++++++++--
be/src/vec/exec/scan/scanner_context.cpp | 105 ++++++++++++---------
be/src/vec/exec/scan/scanner_context.h | 49 ++++------
be/src/vec/exec/scan/vscan_node.cpp | 105 +++++++++++++++------
be/src/vec/exec/scan/vscan_node.h | 12 ++-
be/src/vec/functions/function_timestamp.cpp | 2 +-
be/src/vec/runtime/shared_hash_table_controller.h | 2 +-
be/src/vec/runtime/shared_scanner_controller.h | 69 ++++++++++++++
.../properties/ChildOutputPropertyDeriver.java | 5 +-
.../org/apache/doris/planner/OlapScanNode.java | 3 +
.../main/java/org/apache/doris/qe/Coordinator.java | 25 +++--
.../java/org/apache/doris/qe/SessionVariable.java | 1 +
gensrc/thrift/PaloInternalService.thrift | 4 +
.../data/nereids_syntax_p0/grouping_sets.out | 11 ++-
.../nereids_syntax_p0/sub_query_correlated.out | 90 +++++++++---------
.../sub_query_diff_old_optimize.out | 12 +--
.../correctness_p0/test_colocate_join.groovy | 2 +-
.../duplicate/storage/test_dup_tab_char.groovy | 2 +-
.../nereids_function_p0/gen_function/gen.groovy | 2 +-
.../suites/nereids_syntax_p0/grouping_sets.groovy | 4 +-
.../suites/nereids_syntax_p0/set_operation.groovy | 6 +-
.../nereids_syntax_p0/sub_query_correlated.groovy | 46 ++++-----
.../sub_query_diff_old_optimize.groovy | 4 +-
.../performance_p0/redundant_conjuncts.groovy | 4 +-
.../conditional_functions/test_query_limit.groovy | 6 +-
.../window_functions/test_window_fn.groovy | 20 ++--
35 files changed, 472 insertions(+), 284 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index e4fdc27911..2c0da39b8e 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -246,6 +246,7 @@ E(SEGCOMPACTION_INIT_READER, -3117);
E(SEGCOMPACTION_INIT_WRITER, -3118);
E(SEGCOMPACTION_FAILED, -3119);
E(PIP_WAIT_FOR_RF, -3120);
+E(PIP_WAIT_FOR_SC, -3121);
E(INVERTED_INDEX_INVALID_PARAMETERS, -6000);
E(INVERTED_INDEX_NOT_SUPPORTED, -6001);
E(INVERTED_INDEX_CLUCENE_ERROR, -6002);
@@ -383,6 +384,7 @@ public:
ERROR_CTOR(EndOfFile, END_OF_FILE)
ERROR_CTOR(InternalError, INTERNAL_ERROR)
ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF)
+ ERROR_CTOR(WaitForScannerContext, PIP_WAIT_FOR_SC)
ERROR_CTOR(RuntimeError, RUNTIME_ERROR)
ERROR_CTOR(Cancelled, CANCELLED)
ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED)
@@ -402,6 +404,8 @@ public:
bool ok() const { return _code == ErrorCode::OK; }
+ bool is_blocked_by_sc() const { return _code ==
ErrorCode::PIP_WAIT_FOR_SC; }
+
bool is_io_error() const {
return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH ==
_code ||
ErrorCode::CHECKSUM_ERROR == _code ||
ErrorCode::FILE_DATA_ERROR == _code ||
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 7ec1f5a21c..5db63116cd 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -43,56 +43,6 @@
#include "vec/runtime/shared_hash_table_controller.h"
namespace doris {
-// PrimitiveType->TExprNodeType
-// TODO: use constexpr if we use c++14
-TExprNodeType::type get_expr_node_type(PrimitiveType type) {
- switch (type) {
- case TYPE_BOOLEAN:
- return TExprNodeType::BOOL_LITERAL;
-
- case TYPE_TINYINT:
- case TYPE_SMALLINT:
- case TYPE_INT:
- case TYPE_BIGINT:
- return TExprNodeType::INT_LITERAL;
-
- case TYPE_LARGEINT:
- return TExprNodeType::LARGE_INT_LITERAL;
- break;
-
- case TYPE_NULL:
- return TExprNodeType::NULL_LITERAL;
-
- case TYPE_FLOAT:
- case TYPE_DOUBLE:
- case TYPE_TIME:
- case TYPE_TIMEV2:
- return TExprNodeType::FLOAT_LITERAL;
- break;
-
- case TYPE_DECIMAL32:
- case TYPE_DECIMAL64:
- case TYPE_DECIMAL128I:
- case TYPE_DECIMALV2:
- return TExprNodeType::DECIMAL_LITERAL;
-
- case TYPE_DATETIME:
- case TYPE_DATEV2:
- case TYPE_DATETIMEV2:
- return TExprNodeType::DATE_LITERAL;
-
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- case TYPE_HLL:
- case TYPE_OBJECT:
- case TYPE_STRING:
- return TExprNodeType::STRING_LITERAL;
-
- default:
- DCHECK(false) << "Invalid type.";
- return TExprNodeType::NULL_LITERAL;
- }
-}
// PrimitiveType-> PColumnType
// TODO: use constexpr if we use c++14
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index f673ca1afa..6451927f77 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -25,13 +25,21 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
bool ScanOperator::can_read() {
- if (_node->_eos || _node->_scanner_ctx->done() ||
_node->_scanner_ctx->no_schedule()) {
- // _eos: need eos
- // _scanner_ctx->done(): need finish
- // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
- return true;
+ if (!_node->_opened) {
+ if (_node->_should_create_scanner || _node->ready_to_open()) {
+ return true;
+ } else {
+ return false;
+ }
} else {
- return !_node->_scanner_ctx->empty_in_queue(); // there are some
blocks to process
+ if (_node->_eos || _node->_scanner_ctx->done() ||
_node->_scanner_ctx->no_schedule()) {
+ // _eos: need eos
+ // _scanner_ctx->done(): need finish
+ // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
+ return true;
+ } else {
+ return _node->ready_to_read(); // there are some blocks to process
+ }
}
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 727383d1a5..c2852b67a9 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -344,6 +344,9 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
if (request.__isset.load_job_id) {
_runtime_state->set_load_job_id(request.load_job_id);
}
+ if (request.__isset.shared_scan_opt) {
+ _runtime_state->set_shared_scan_opt(request.shared_scan_opt);
+ }
if (request.query_options.__isset.is_report_success) {
fragment_context->set_is_report_success(request.query_options.is_report_success);
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 6f5732121e..fecb7d61b3 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -135,6 +135,9 @@ Status PipelineTask::execute(bool* eos) {
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
return Status::OK();
+ } else if (st.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
+ set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
+ return Status::OK();
}
RETURN_IF_ERROR(st);
}
diff --git a/be/src/runtime/query_fragments_ctx.h
b/be/src/runtime/query_fragments_ctx.h
index 389cecb860..098bcce063 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -33,6 +33,7 @@
#include "util/threadpool.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/shared_hash_table_controller.h"
+#include "vec/runtime/shared_scanner_controller.h"
namespace doris {
@@ -46,6 +47,7 @@ public:
: fragment_num(total_fragment_num), timeout_second(-1),
_exec_env(exec_env) {
_start_time = DateTimeValue::local_time();
_shared_hash_table_controller.reset(new
vectorized::SharedHashTableController());
+ _shared_scanner_controller.reset(new
vectorized::SharedScannerController());
}
~QueryFragmentsCtx() {
@@ -122,6 +124,10 @@ public:
return _shared_hash_table_controller;
}
+ std::shared_ptr<vectorized::SharedScannerController>
get_shared_scanner_controller() {
+ return _shared_scanner_controller;
+ }
+
vectorized::RuntimePredicate& get_runtime_predicate() { return
_runtime_predicate; }
public:
@@ -167,6 +173,7 @@ private:
std::atomic<bool> _is_cancelled {false};
std::shared_ptr<vectorized::SharedHashTableController>
_shared_hash_table_controller;
+ std::shared_ptr<vectorized::SharedScannerController>
_shared_scanner_controller;
vectorized::RuntimePredicate _runtime_predicate;
};
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index d309390d6a..7ded1c66fc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -88,6 +88,7 @@ public:
bool abort_on_default_limit_exceeded() const {
return _query_options.abort_on_default_limit_exceeded;
}
+ int query_parallel_instance_num() const { return
_query_options.parallel_instance; }
int max_errors() const { return _query_options.max_errors; }
int query_timeout() const { return _query_options.query_timeout; }
int insert_timeout() const { return _query_options.insert_timeout; }
@@ -200,6 +201,10 @@ public:
int64_t load_job_id() const { return _load_job_id; }
+ void set_shared_scan_opt(bool shared_scan_opt) { _shared_scan_opt =
shared_scan_opt; }
+
+ bool shared_scan_opt() const { return _shared_scan_opt; }
+
const std::string get_error_log_file_path() const { return
_error_log_file_path; }
// append error msg and error line to file when loading data.
@@ -453,6 +458,7 @@ private:
std::string _db_name;
std::string _load_dir;
int64_t _load_job_id;
+ bool _shared_scan_opt = false;
// mini load
int64_t _normal_row_number;
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 29bea35d47..12343adf80 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -320,8 +320,7 @@ set(VEC_FILES
if (WITH_MYSQL)
set(VEC_FILES
${VEC_FILES}
- exec/scan/mysql_scanner.cpp
- )
+ exec/scan/mysql_scanner.cpp)
endif ()
add_library(Vec STATIC
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 8761ca380c..55babf3066 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -411,10 +411,9 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
if (tablet == nullptr) {
- std::stringstream ss;
- ss << "failed to get tablet: " << tablet_id << ", reason: " << err;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
+ auto err_str = fmt::format("failed to get tablet: {}, reason: {}",
tablet_id, err);
+ LOG(WARNING) << err_str;
+ return Status::InternalError(err_str);
}
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&cond_ranges;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 57d458d770..0e418256c2 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -33,20 +33,68 @@ public:
: vectorized::ScannerContext(state, parent, input_tuple_desc,
output_tuple_desc,
scanners, limit,
max_bytes_in_blocks_queue) {}
- void _update_block_queue_empty() override { _blocks_queue_empty =
_blocks_queue.empty(); }
+ Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
+ int id, bool wait = false) override {
+ {
+ std::unique_lock<std::mutex> l(_transfer_lock);
+ if (state->is_cancelled()) {
+ _process_status = Status::Cancelled("cancelled");
+ }
- Status get_block_from_queue(vectorized::BlockUPtr* block, bool* eos,
- bool wait = false) override {
- return vectorized::ScannerContext::get_block_from_queue(block, eos,
false);
+ if (!_process_status.ok()) {
+ return _process_status;
+ }
+ }
+
+ {
+ std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
+ if (!_blocks_queues[id].empty()) {
+ *block = std::move(_blocks_queues[id].front());
+ _blocks_queues[id].pop_front();
+ return Status::OK();
+ } else {
+ *eos = _is_finished || _should_stop;
+ }
+ }
+ return Status::OK();
}
// We should make those method lock free.
bool done() override { return _is_finished || _should_stop ||
_status_error; }
- bool no_schedule() override { return _num_running_scanners == 0 &&
_num_scheduling_ctx == 0; }
- bool empty_in_queue() override { return _blocks_queue_empty; }
+
+ void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks)
override {
+ const int queue_size = _queue_mutexs.size();
+ const int block_size = blocks.size();
+ for (int i = 0; i < queue_size && i < block_size; ++i) {
+ int queue = _next_queue_to_feed;
+ {
+ std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
+ for (int j = i; j < block_size; j += queue_size) {
+ _blocks_queues[queue].emplace_back(std::move(blocks[j]));
+ }
+ }
+ _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
+ }
+ }
+
+ bool empty_in_queue(int id) override {
+ std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
+ return _blocks_queues[id].empty();
+ }
+
+ void set_max_queue_size(int max_queue_size) override {
+ for (int i = 0; i < max_queue_size; ++i) {
+ _blocks_queue_empty.emplace_back(true);
+ _queue_mutexs.emplace_back(new std::mutex);
+ _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
+ }
+ }
private:
- std::atomic_bool _blocks_queue_empty = true;
+ int _next_queue_to_feed = 0;
+ std::vector<bool> _blocks_queue_empty;
+ std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
+ std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
};
} // namespace pipeline
} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 70a962b8ea..16d8971227 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -23,24 +23,53 @@
#include "runtime/runtime_state.h"
#include "util/threadpool.h"
#include "vec/core/block.h"
-#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
+ScannerContext::ScannerContext(doris::RuntimeState* state_,
doris::vectorized::VScanNode* parent,
+ const doris::TupleDescriptor* input_tuple_desc,
+ const doris::TupleDescriptor* output_tuple_desc,
+ const std::list<VScanner*>& scanners_, int64_t
limit_,
+ int64_t max_bytes_in_blocks_queue_)
+ : _state(state_),
+ _parent(parent),
+ _input_tuple_desc(input_tuple_desc),
+ _output_tuple_desc(output_tuple_desc),
+ _process_status(Status::OK()),
+ _batch_size(state_->batch_size()),
+ limit(limit_),
+ _max_bytes_in_queue(max_bytes_in_blocks_queue_),
+ _scanner_scheduler(state_->exec_env()->scanner_scheduler()),
+ _scanners(scanners_) {
+ ctx_id = UniqueId::gen_uid().to_string();
+ if (_scanners.empty()) {
+ _is_finished = true;
+ }
+}
+
+// After init function call, should not access _parent
Status ScannerContext::init() {
_real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc :
_output_tuple_desc;
// 1. Calculate max concurrency
// TODO: now the max thread num <=
config::doris_scanner_thread_pool_thread_num / 4
// should find a more reasonable value.
- _max_thread_num =
- std::min(config::doris_scanner_thread_pool_thread_num / 4,
(int32_t)_scanners.size());
+ _max_thread_num = _state->shared_scan_opt() ?
config::doris_scanner_thread_pool_thread_num
+ :
config::doris_scanner_thread_pool_thread_num / 4;
+ _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
// For select * from table limit 10; should just use one thread.
if (_parent->should_run_serial()) {
_max_thread_num = 1;
}
+ _scanner_profile = _parent->_scanner_profile;
+ _scanner_sched_counter = _parent->_scanner_sched_counter;
+ _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
+ _free_blocks_memory_usage = _parent->_free_blocks_memory_usage;
+ _newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num;
+ _queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage;
+ _scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer;
// 2. Calculate how many blocks need to be preallocated.
// The calculation logic is as follows:
// 1. Assuming that at most M rows can be scanned in one
scan(config::doris_scanner_row_num),
@@ -50,8 +79,8 @@ Status ScannerContext::init() {
auto doris_scanner_row_num =
limit == -1 ? config::doris_scanner_row_num
:
std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit);
- int real_block_size = limit == -1 ? _state->batch_size()
- :
std::min(static_cast<int64_t>(_state->batch_size()), limit);
+ int real_block_size =
+ limit == -1 ? _batch_size :
std::min(static_cast<int64_t>(_batch_size), limit);
_block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) /
real_block_size;
auto pre_alloc_block_count = _max_thread_num * _block_per_scanner;
@@ -64,7 +93,7 @@ Status ScannerContext::init() {
free_blocks_memory_usage += block->allocated_bytes();
_free_blocks.emplace_back(std::move(block));
}
- _parent->_free_blocks_memory_usage->add(free_blocks_memory_usage);
+ _free_blocks_memory_usage->add(free_blocks_memory_usage);
#ifndef BE_TEST
// 3. get thread token
@@ -91,20 +120,20 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool*
has_free_block) {
if (!_free_blocks.empty()) {
auto block = std::move(_free_blocks.back());
_free_blocks.pop_back();
- _parent->_free_blocks_memory_usage->add(-block->allocated_bytes());
+ _free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
}
*has_free_block = false;
- COUNTER_UPDATE(_parent->_newly_create_free_blocks_num, 1);
- return std::make_unique<vectorized::Block>(_real_tuple_desc->slots(),
_state->batch_size(),
+ COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
+ return std::make_unique<vectorized::Block>(_real_tuple_desc->slots(),
_batch_size,
true /*ignore invalid slots*/);
}
void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block>
block) {
block->clear_column_data();
- _parent->_free_blocks_memory_usage->add(block->allocated_bytes());
+ _free_blocks_memory_usage->add(block->allocated_bytes());
std::lock_guard l(_free_blocks_lock);
_free_blocks.emplace_back(std::move(block));
}
@@ -117,18 +146,18 @@ void
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
_blocks_queue.push_back(std::move(b));
}
blocks.clear();
- _update_block_queue_empty();
_blocks_queue_added_cv.notify_one();
- _parent->_queued_blocks_memory_usage->add(_cur_bytes_in_queue -
old_bytes_in_queue);
+ _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
}
-bool ScannerContext::empty_in_queue() {
- std::unique_lock l(_transfer_lock);
+bool ScannerContext::empty_in_queue(int id) {
+ std::unique_lock<std::mutex> l(_transfer_lock);
return _blocks_queue.empty();
}
-Status ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block,
bool* eos, bool wait) {
- std::unique_lock l(_transfer_lock);
+Status ScannerContext::get_block_from_queue(RuntimeState* state,
vectorized::BlockUPtr* block,
+ bool* eos, int id, bool wait) {
+ std::unique_lock<std::mutex> l(_transfer_lock);
// Normally, the scanner scheduler will schedule ctx.
// But when the amount of data in the blocks queue exceeds the upper limit,
// the scheduler will stop scheduling.
@@ -137,18 +166,18 @@ Status
ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block, bool*
// data can be continuously fetched.
if (_has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
_num_scheduling_ctx++;
- _state->exec_env()->scanner_scheduler()->submit(this);
+ _scanner_scheduler->submit(this);
}
// Wait for block from queue
if (wait) {
- SCOPED_TIMER(_parent->_scanner_wait_batch_timer);
+ SCOPED_TIMER(_scanner_wait_batch_timer);
while (!(!_blocks_queue.empty() || _is_finished ||
!_process_status.ok() ||
- _state->is_cancelled())) {
+ state->is_cancelled())) {
_blocks_queue_added_cv.wait(l);
}
}
- if (_state->is_cancelled()) {
+ if (state->is_cancelled()) {
_process_status = Status::Cancelled("cancelled");
}
@@ -159,10 +188,9 @@ Status
ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block, bool*
if (!_blocks_queue.empty()) {
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();
- _update_block_queue_empty();
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
- _parent->_queued_blocks_memory_usage->add(-block_bytes);
+ _queued_blocks_memory_usage->add(-block_bytes);
return Status::OK();
} else {
*eos = _is_finished;
@@ -181,9 +209,9 @@ bool ScannerContext::set_status_on_error(const Status&
status) {
return false;
}
-Status ScannerContext::_close_and_clear_scanners() {
- std::unique_lock l(_scanners_lock);
- if (_state->enable_profile()) {
+Status ScannerContext::_close_and_clear_scanners(VScanNode* node,
RuntimeState* state) {
+ std::unique_lock<std::mutex> l(_scanners_lock);
+ if (state->enable_profile()) {
std::stringstream scanner_statistics;
std::stringstream scanner_rows_read;
scanner_statistics << "[";
@@ -207,13 +235,12 @@ Status ScannerContext::_close_and_clear_scanners() {
}
scanner_statistics << "]";
scanner_rows_read << "]";
- _parent->_scanner_profile->add_info_string("PerScannerRunningTime",
- scanner_statistics.str());
- _parent->_scanner_profile->add_info_string("PerScannerRowsRead",
scanner_rows_read.str());
+ node->_scanner_profile->add_info_string("PerScannerRunningTime",
scanner_statistics.str());
+ node->_scanner_profile->add_info_string("PerScannerRowsRead",
scanner_rows_read.str());
}
// Only unfinished scanners here
for (auto scanner : _scanners) {
- scanner->close(_state);
+ scanner->close(state);
// Scanners are in ObjPool in ScanNode,
// so no need to delete them here.
}
@@ -221,8 +248,8 @@ Status ScannerContext::_close_and_clear_scanners() {
return Status::OK();
}
-void ScannerContext::clear_and_join() {
- std::unique_lock l(_transfer_lock);
+void ScannerContext::clear_and_join(VScanNode* node, RuntimeState* state) {
+ std::unique_lock<std::mutex> l(_transfer_lock);
do {
if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
break;
@@ -239,14 +266,10 @@ void ScannerContext::clear_and_join() {
}
// Must wait all running scanners stop running.
// So that we can make sure to close all scanners.
- _close_and_clear_scanners();
+ _close_and_clear_scanners(node, state);
- COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
- COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
_blocks_queue.clear();
_free_blocks.clear();
-
- return;
}
bool ScannerContext::no_schedule() {
@@ -273,7 +296,7 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
std::lock_guard l(_transfer_lock);
_num_scheduling_ctx++;
- auto submit_st = _state->exec_env()->scanner_scheduler()->submit(this);
+ auto submit_st = _scanner_scheduler->submit(this);
if (!submit_st.ok()) {
_num_scheduling_ctx--;
}
@@ -285,11 +308,6 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
// same scanner.
if (scanner->need_to_close() && scanner->set_counted_down() &&
(--_num_unfinished_scanners) == 0) {
- // ATTN: this 2 counters will be set at close() again, which is the
final values.
- // But we set them here because the counter set at close() can not
send to FE's profile.
- // So we set them here, and the counter value may be little less than
final values.
- COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
- COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
@@ -306,7 +324,7 @@ void
ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru
if (_has_enough_space_in_blocks_queue()) {
// If there are enough space in blocks queue,
// the scanner number depends on the _free_blocks numbers
- std::lock_guard l(_free_blocks_lock);
+ std::lock_guard f(_free_blocks_lock);
thread_slot_num = _free_blocks.size() / _block_per_scanner;
thread_slot_num += (_free_blocks.size() % _block_per_scanner != 0);
thread_slot_num = std::min(thread_slot_num, _max_thread_num -
_num_running_scanners);
@@ -340,7 +358,6 @@ void
ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru
}
}
}
- return;
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 09500d59cf..1285467a5d 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -32,7 +32,6 @@ namespace doris {
class PriorityThreadPool;
class ThreadPool;
class ThreadPoolToken;
-class ScannerScheduler;
namespace vectorized {
@@ -51,35 +50,21 @@ class ScannerContext {
public:
ScannerContext(RuntimeState* state_, VScanNode* parent, const
TupleDescriptor* input_tuple_desc,
const TupleDescriptor* output_tuple_desc, const
std::list<VScanner*>& scanners_,
- int64_t limit_, int64_t max_bytes_in_blocks_queue_)
- : _state(state_),
- _parent(parent),
- _input_tuple_desc(input_tuple_desc),
- _output_tuple_desc(output_tuple_desc),
- _process_status(Status::OK()),
- limit(limit_),
- _max_bytes_in_queue(max_bytes_in_blocks_queue_),
- _scanners(scanners_) {
- ctx_id = UniqueId::gen_uid().to_string();
- if (_scanners.empty()) {
- _is_finished = true;
- }
- }
+ int64_t limit_, int64_t max_bytes_in_blocks_queue_);
virtual ~ScannerContext() = default;
-
Status init();
vectorized::BlockUPtr get_free_block(bool* has_free_block);
void return_free_block(std::unique_ptr<vectorized::Block> block);
// Append blocks from scanners to the blocks queue.
- void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks);
-
+ virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
blocks);
// Get next block from blocks queue. Called by ScanNode
// Set eos to true if there is no more data to read.
// And if eos is true, the block returned must be nullptr.
- virtual Status get_block_from_queue(vectorized::BlockUPtr* block, bool*
eos, bool wait = true);
+ virtual Status get_block_from_queue(RuntimeState* state,
vectorized::BlockUPtr* block,
+ bool* eos, int id, bool wait = true);
// When a scanner complete a scan, this method will be called
// to return the scanner to the list for next scheduling.
@@ -121,7 +106,7 @@ public:
void get_next_batch_of_scanners(std::list<VScanner*>* current_run);
- void clear_and_join();
+ void clear_and_join(VScanNode* node, RuntimeState* state);
virtual bool no_schedule();
@@ -129,12 +114,14 @@ public:
RuntimeState* state() { return _state; }
- void incr_num_ctx_scheduling(int64_t num) { _num_ctx_scheduling += num; }
- void incr_num_scanner_scheduling(int64_t num) { _num_scanner_scheduling +=
num; }
+ void incr_num_ctx_scheduling(int64_t num) {
_scanner_ctx_sched_counter->update(num); }
+ void incr_num_scanner_scheduling(int64_t num) {
_scanner_sched_counter->update(num); }
VScanNode* parent() { return _parent; }
- virtual bool empty_in_queue();
+ virtual bool empty_in_queue(int id);
+
+ virtual void set_max_queue_size(int max_queue_size) {};
// the unique id of this context
std::string ctx_id;
@@ -143,15 +130,12 @@ public:
std::vector<bthread_t> _btids;
private:
- Status _close_and_clear_scanners();
+ Status _close_and_clear_scanners(VScanNode* node, RuntimeState* state);
inline bool _has_enough_space_in_blocks_queue() const {
return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
}
- // do nothing here, we only do update on pip_scanner_context
- virtual void _update_block_queue_empty() {}
-
protected:
RuntimeState* _state;
VScanNode* _parent;
@@ -198,6 +182,7 @@ protected:
doris::Mutex _free_blocks_lock;
std::vector<vectorized::BlockUPtr> _free_blocks;
+ int _batch_size;
// The limit from SQL's limit clause
int64_t limit;
@@ -221,6 +206,7 @@ protected:
// The max limit bytes of blocks in blocks queue
int64_t _max_bytes_in_queue;
+ doris::vectorized::ScannerScheduler* _scanner_scheduler;
// List "scanners" saves all "unfinished" scanners.
// The scanner scheduler will pop scanners from this list, run scanner,
// and then if the scanner is not finished, will be pushed back to this
list.
@@ -230,8 +216,13 @@ protected:
std::vector<int64_t> _finished_scanner_runtime;
std::vector<int64_t> _finished_scanner_rows_read;
- int64_t _num_ctx_scheduling = 0;
- int64_t _num_scanner_scheduling = 0;
+ std::shared_ptr<RuntimeProfile> _scanner_profile;
+ RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
+ RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage =
nullptr;
+ RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
+ RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index e7b46ca659..f019df4b60 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -66,6 +66,8 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
_state = state;
+ _is_pipeline_scan = state->enable_pipeline_exec();
+ _shared_scan_opt = state->shared_scan_opt();
const TQueryOptions& query_options = state->query_options();
if (query_options.__isset.max_scan_key_num) {
@@ -92,6 +94,21 @@ Status VScanNode::prepare(RuntimeState* state) {
for (auto& rf_ctx : _runtime_filter_ctxs) {
rf_ctx.runtime_filter->init_profile(_runtime_profile.get());
}
+
+ if (_is_pipeline_scan) {
+ if (_shared_scan_opt) {
+ _shared_scanner_controller =
+
state->get_query_fragments_ctx()->get_shared_scanner_controller();
+ auto [should_create_scanner, queue_id] =
+
_shared_scanner_controller->should_build_scanner_and_queue_id(id());
+ _should_create_scanner = should_create_scanner;
+ _context_queue_id = queue_id;
+ } else {
+ _should_create_scanner = true;
+ _context_queue_id = 0;
+ }
+ }
+
return Status::OK();
}
@@ -113,18 +130,37 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_process_conjuncts());
- if (_eos) {
- return Status::OK();
- }
- std::list<VScanner*> scanners;
- RETURN_IF_ERROR(_init_scanners(&scanners));
- if (scanners.empty()) {
- _eos = true;
+ if (_is_pipeline_scan) {
+ if (_should_create_scanner) {
+ auto status = !_eos ? _prepare_scanners() : Status::OK();
+ if (_scanner_ctx) {
+ DCHECK(!_eos && _num_scanners->value() > 0);
+ _scanner_ctx->set_max_queue_size(
+ _shared_scan_opt ?
std::max(state->query_parallel_instance_num(), 1) : 1);
+ RETURN_IF_ERROR(
+
_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
+ }
+ if (_shared_scan_opt) {
+ _shared_scanner_controller->set_scanner_context(id(),
+ _eos ? nullptr
: _scanner_ctx);
+ }
+ RETURN_IF_ERROR(status);
+ } else if (_shared_scanner_controller->scanner_context_is_ready(id()))
{
+ _scanner_ctx =
_shared_scanner_controller->get_scanner_context(id());
+ if (!_scanner_ctx) {
+ _eos = true;
+ }
+ } else {
+ return Status::WaitForScannerContext("Need wait for scanner
context create");
+ }
} else {
- COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
- RETURN_IF_ERROR(_start_scanners(scanners));
+ RETURN_IF_ERROR(!_eos ? _prepare_scanners() : Status::OK());
+ if (_scanner_ctx) {
+
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
+ }
}
+
RETURN_IF_CANCELLED(state);
_opened = true;
return Status::OK();
@@ -163,7 +199,7 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
}
vectorized::BlockUPtr scan_block = nullptr;
- RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(&scan_block, eos));
+ RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, &scan_block,
eos, _context_queue_id));
if (*eos) {
DCHECK(scan_block == nullptr);
return Status::OK();
@@ -184,12 +220,6 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
Status VScanNode::_init_profile() {
// 1. counters for scan node
- auto* memory_usage = _runtime_profile->create_child("MemoryUsage", true,
true);
- _runtime_profile->add_child(memory_usage, false, nullptr);
- _queued_blocks_memory_usage =
- memory_usage->AddHighWaterMarkCounter("QueuedBlocks",
TUnit::BYTES);
- _free_blocks_memory_usage =
memory_usage->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES);
-
_rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead",
TUnit::UNIT);
_total_throughput_counter =
runtime_profile()->add_rate_counter("TotalReadThroughput",
_rows_read_counter);
@@ -201,30 +231,36 @@ Status VScanNode::_init_profile() {
_scanner_profile.reset(new RuntimeProfile("VScanner"));
runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
+ auto* memory_usage = _scanner_profile->create_child("MemoryUsage", true,
true);
+ _runtime_profile->add_child(memory_usage, false, nullptr);
+ _queued_blocks_memory_usage =
+ memory_usage->AddHighWaterMarkCounter("QueuedBlocks",
TUnit::BYTES);
+ _free_blocks_memory_usage =
memory_usage->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES);
+ _newly_create_free_blocks_num =
+ ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum",
TUnit::UNIT);
+ // time of transfer thread to wait for block from scan thread
+ _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile,
"ScannerBatchWaitTime");
+ _scanner_sched_counter = ADD_COUNTER(_scanner_profile,
"ScannerSchedCount", TUnit::UNIT);
+ _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile,
"ScannerCtxSchedCount", TUnit::UNIT);
+
_scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
_scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime");
_prefilter_timer = ADD_TIMER(_scanner_profile, "ScannerPrefilterTime");
_convert_block_timer = ADD_TIMER(_scanner_profile,
"ScannerConvertBlockTime");
_filter_timer = ADD_TIMER(_scanner_profile, "ScannerFilterTime");
- _scanner_sched_counter = ADD_COUNTER(_runtime_profile,
"ScannerSchedCount", TUnit::UNIT);
- _scanner_ctx_sched_counter = ADD_COUNTER(_runtime_profile,
"ScannerCtxSchedCount", TUnit::UNIT);
- // time of transfer thread to wait for block from scan thread
- _scanner_wait_batch_timer = ADD_TIMER(_runtime_profile,
"ScannerBatchWaitTime");
// time of scan thread to wait for worker thread of the thread pool
_scanner_wait_worker_timer = ADD_TIMER(_runtime_profile,
"ScannerWorkerWaitTime");
_pre_alloc_free_blocks_num =
ADD_COUNTER(_runtime_profile, "PreAllocFreeBlocksNum",
TUnit::UNIT);
- _newly_create_free_blocks_num =
- ADD_COUNTER(_runtime_profile, "NewlyCreateFreeBlocksNum",
TUnit::UNIT);
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile,
"MaxScannerThreadNum", TUnit::UNIT);
return Status::OK();
}
Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
- if (_state->enable_pipeline_exec()) {
+ if (_is_pipeline_scan) {
_scanner_ctx.reset(new pipeline::PipScannerContext(_state, this,
_input_tuple_desc,
_output_tuple_desc,
scanners, limit(),
_state->query_options().mem_limit / 20));
@@ -234,7 +270,6 @@ Status VScanNode::_start_scanners(const
std::list<VScanner*>& scanners) {
_state->query_options().mem_limit / 20));
}
RETURN_IF_ERROR(_scanner_ctx->init());
-
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
return Status::OK();
}
@@ -374,10 +409,12 @@ Status VScanNode::close(RuntimeState* state) {
void VScanNode::release_resource(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VScanNode::release_resource");
if (_scanner_ctx.get()) {
- // stop and wait the scanner scheduler to be done
- // _scanner_ctx may not be created for some short circuit case.
- _scanner_ctx->set_should_stop();
- _scanner_ctx->clear_and_join();
+ if (!state->enable_pipeline_exec() || _should_create_scanner) {
+ // stop and wait the scanner scheduler to be done
+ // _scanner_ctx may not be created for some short circuit case.
+ _scanner_ctx->set_should_stop();
+ _scanner_ctx->clear_and_join(this, state);
+ }
}
for (auto& ctx : _runtime_filter_ctxs) {
@@ -1318,4 +1355,16 @@ VScanNode::PushDownType
VScanNode::_should_push_down_in_predicate(VInPredicate*
return PushDownType::ACCEPTABLE;
}
+Status VScanNode::_prepare_scanners() {
+ std::list<VScanner*> scanners;
+ RETURN_IF_ERROR(_init_scanners(&scanners));
+ if (scanners.empty()) {
+ _eos = true;
+ } else {
+ COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
+ RETURN_IF_ERROR(_start_scanners(scanners));
+ }
+
+ return Status::OK();
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 23b1e59b51..51dc2edb1b 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -103,6 +103,8 @@ public:
Status try_close();
bool should_run_serial() const { return _should_run_serial; }
+ bool ready_to_open() { return
_shared_scanner_controller->scanner_context_is_ready(id()); }
+ bool ready_to_read() { return
!_scanner_ctx->empty_in_queue(_context_queue_id); }
enum class PushDownType {
// The predicate can not be pushed down to data source
@@ -178,8 +180,12 @@ protected:
// Only predicate on key column can be pushed down.
virtual bool _is_key_column(const std::string& col_name) { return false; }
+ Status _prepare_scanners();
+
protected:
RuntimeState* _state;
+ bool _is_pipeline_scan = false;
+ bool _shared_scan_opt = false;
// For load scan node, there should be both input and output tuple
descriptor.
// For query scan node, there is only output_tuple_desc.
TupleId _input_tuple_id = -1;
@@ -267,7 +273,11 @@ protected:
int64_t _limit_per_scanner = -1;
protected:
- std::unique_ptr<RuntimeProfile> _scanner_profile;
+ std::shared_ptr<vectorized::SharedScannerController>
_shared_scanner_controller;
+ bool _should_create_scanner = false;
+ int _context_queue_id = -1;
+
+ std::shared_ptr<RuntimeProfile> _scanner_profile;
// rows read from the scanner (including those discarded by (pre)filters)
RuntimeProfile::Counter* _rows_read_counter;
diff --git a/be/src/vec/functions/function_timestamp.cpp
b/be/src/vec/functions/function_timestamp.cpp
index 4ec1b30cc1..0ba8ec5f84 100644
--- a/be/src/vec/functions/function_timestamp.cpp
+++ b/be/src/vec/functions/function_timestamp.cpp
@@ -417,7 +417,7 @@ struct UnixTimeStampDateImpl {
static Status execute_impl(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, size_t result,
size_t input_rows_count) {
- const ColumnPtr col_source =
block.get_by_position(arguments[0]).column;
+ const ColumnPtr& col_source =
block.get_by_position(arguments[0]).column;
auto col_result = ColumnVector<Int32>::create();
auto null_map = ColumnVector<UInt8>::create();
auto& col_result_data = col_result->get_data();
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index a6cf99edca..4c579f1d91 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -85,4 +85,4 @@ private:
};
} // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/runtime/shared_scanner_controller.h
b/be/src/vec/runtime/shared_scanner_controller.h
new file mode 100644
index 0000000000..5fb2244c82
--- /dev/null
+++ b/be/src/vec/runtime/shared_scanner_controller.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "vec/core/block.h"
+#include "vec/exec/scan/scanner_context.h"
+
+namespace doris::vectorized {
+
+class SharedScannerController {
+public:
+ std::pair<bool, int> should_build_scanner_and_queue_id(int my_node_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto it = _scanner_parallel.find(my_node_id);
+
+ if (it == _scanner_parallel.cend()) {
+ _scanner_parallel.insert({my_node_id, 0});
+ return {true, 0};
+ } else {
+ auto queue_id = it->second;
+ _scanner_parallel[my_node_id] = queue_id + 1;
+ return {false, queue_id + 1};
+ }
+ }
+
+ void set_scanner_context(int my_node_id,
+ const std::shared_ptr<ScannerContext>
scanner_context) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _scanner_context.insert({my_node_id, scanner_context});
+ }
+
+ bool scanner_context_is_ready(int my_node_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _scanner_context.find(my_node_id) != _scanner_context.end();
+ }
+
+ std::shared_ptr<ScannerContext> get_scanner_context(int my_node_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _scanner_context[my_node_id];
+ }
+
+private:
+ std::mutex _mutex;
+ std::map<int /*node id*/, int /*parallel*/> _scanner_parallel;
+ std::map<int /*node id*/, std::shared_ptr<ScannerContext>>
_scanner_context;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 4623ba311e..467bdc004c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -44,6 +44,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggrega
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -218,7 +219,9 @@ public class ChildOutputPropertyDeriver extends
PlanVisitor<PhysicalProperties,
@Override
public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan,
PlanContext context) {
// TODO: find a better way to handle both tablet num == 1 and colocate
table together in future
- if (!olapScan.getTable().isColocateTable() &&
olapScan.getScanTabletNum() == 1) {
+ if (!olapScan.getTable().isColocateTable() &&
olapScan.getScanTabletNum() == 1
+ &&
(!ConnectContext.get().getSessionVariable().enablePipelineEngine()
+ ||
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() == 1)) {
return PhysicalProperties.GATHER;
} else if (olapScan.getDistributionSpec() instanceof
DistributionSpecHash) {
return PhysicalProperties.createHash((DistributionSpecHash)
olapScan.getDistributionSpec());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index d53697a338..1821b55829 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1118,6 +1118,9 @@ public class OlapScanNode extends ScanNode {
@Override
public int getNumInstances() {
+ if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
+ return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ }
return result.size();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d35bcec8c0..18b7ae9cab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1622,6 +1622,7 @@ public class Coordinator {
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
parallelExecInstanceNum, params);
} else {
+ params.sharedScanOpt = true;
// case A
for (Entry<TNetworkAddress, Map<Integer,
List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
fragment.getFragmentId()).scanRangeAssignment.entrySet()) {
@@ -1630,13 +1631,22 @@ public class Coordinator {
for (Integer planNodeId : value.keySet()) {
List<TScanRangeParams> perNodeScanRanges =
value.get(planNodeId);
- int expectedInstanceNum = 1;
- if (parallelExecInstanceNum > 1) {
- //the scan instance num should not larger than the
tablets num
- expectedInstanceNum =
Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
+ List<List<TScanRangeParams>> perInstanceScanRanges =
Lists.newArrayList();
+ if (!enablePipelineEngine) {
+ int expectedInstanceNum = 1;
+ if (parallelExecInstanceNum > 1) {
+ //the scan instance num should not larger than
the tablets num
+ expectedInstanceNum =
Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
+ }
+ perInstanceScanRanges =
ListUtil.splitBySize(perNodeScanRanges,
+ expectedInstanceNum);
+ } else {
+ int expectedInstanceNum =
Math.min(parallelExecInstanceNum,
+ leftMostNode.getNumInstances());
+ for (int j = 0; j < Math.max(expectedInstanceNum,
1); j++) {
+ perInstanceScanRanges.add(perNodeScanRanges);
+ }
}
- List<List<TScanRangeParams>> perInstanceScanRanges =
ListUtil.splitBySize(perNodeScanRanges,
- expectedInstanceNum);
LOG.debug("scan range number per instance is: {}",
perInstanceScanRanges.size());
@@ -3034,6 +3044,8 @@ public class Coordinator {
public List<FInstanceExecParam> instanceExecParams =
Lists.newArrayList();
public FragmentScanRangeAssignment scanRangeAssignment = new
FragmentScanRangeAssignment();
+ public boolean sharedScanOpt = false;
+
public FragmentExecParams(PlanFragment fragment) {
this.fragment = fragment;
}
@@ -3125,6 +3137,7 @@ public class Coordinator {
fragment.isTransferQueryStatisticsWithEveryBatch());
params.setFragment(fragment.toThrift());
params.setLocalParams(Lists.newArrayList());
+ params.setSharedScanOpt(sharedScanOpt);
res.put(instanceExecParam.host, params);
}
TPipelineFragmentParams params =
res.get(instanceExecParam.host);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ad471791ea..8dfd08a05e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1593,6 +1593,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setCodegenLevel(codegenLevel);
tResult.setBeExecVersion(Config.be_exec_version);
tResult.setEnablePipelineEngine(enablePipelineEngine);
+ tResult.setParallelInstance(parallelExecInstanceNum);
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index cafcf1044d..d799b28945 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -201,9 +201,12 @@ struct TQueryOptions {
// For debug purpose, skip delete bitmap when reading data
63: optional bool skip_delete_bitmap = false
+
64: optional bool dry_run_query = false
65: optional bool enable_common_expr_pushdown = false;
+
+ 66: optional i32 parallel_instance = 1
}
@@ -598,6 +601,7 @@ struct TPipelineFragmentParams {
22: optional TGlobalDict global_dict // scan node could use the global dict
to encode the string value to an integer
23: optional Planner.TPlanFragment fragment
24: list<TPipelineInstanceParams> local_params
+ 25: optional bool shared_scan_opt = false;
}
struct TPipelineFragmentParamsList {
diff --git a/regression-test/data/nereids_syntax_p0/grouping_sets.out
b/regression-test/data/nereids_syntax_p0/grouping_sets.out
index 6c18f54e2b..bbc2997e3c 100644
--- a/regression-test/data/nereids_syntax_p0/grouping_sets.out
+++ b/regression-test/data/nereids_syntax_p0/grouping_sets.out
@@ -159,16 +159,16 @@
4 3 18
-- !select3 --
-\N \N 24
\N \N 6
+\N \N 24
\N 1 1
\N 2 3
\N 3 4
\N 4 2
\N 6 5
\N 9 3
-1 \N 24
1 \N 6
+1 \N 24
1 1 1
1 2 3
1 3 4
@@ -216,13 +216,13 @@
-- !select7 --
1
-2
-3
-4
1
2
+2
+3
3
4
+4
-- !select1 --
a 1
@@ -258,3 +258,4 @@ all 1
2 1
2 1
2 2
+
diff --git a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
index 80640f7372..b7e57d2613 100644
--- a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
+++ b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
@@ -167,23 +167,23 @@
24 4
-- !in_subquery_with_order --
-1 3
1 2
+1 3
2 5
-3 3
20 2
22 3
24 4
+3 3
-- !exists_subquery_with_order --
-1 3
1 2
+1 3
2 4
-3 4
-3 3
20 2
22 3
24 4
+3 3
+3 4
-- !scalar_subquery_with_limit --
20 2
@@ -200,13 +200,13 @@
20
-- !case_when_subquery --
-4.0
-4.0
20.0
20.0
20.0
20.0
20.0
+4.0
+4.0
-- !in --
1 2
@@ -244,106 +244,106 @@
3 4
-- !hash_join_with_other_conjuncts1 --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !hash_join_with_other_conjuncts2 --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !hash_join_with_other_conjuncts3 --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !hash_join_with_other_conjuncts4 --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !same_subquery_in_conjuncts --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !two_subquery_in_one_conjuncts --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !multi_subquery_in_and_scalry --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !multi_subquery_in_and_exist --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !multi_subquery_in_and_exist_sum --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
-3 3
+2 5
20 2
22 3
24 4
+3 3
+3 4
-- !multi_subquery_in_and_in --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !multi_subquery_scalar_and_exist --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
-3 3
+2 5
20 2
22 3
24 4
+3 3
+3 4
-- !multi_subquery_scalar_and_scalar --
-1 3
1 2
-2 5
+1 3
2 4
-3 4
+2 5
3 3
+3 4
-- !multi_subquery_in_first_or_in_and_in --
3 3
diff --git
a/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out
b/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out
index 2b66c921cb..91b55d0c89 100644
--- a/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out
+++ b/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out
@@ -1,21 +1,21 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
-- !exists_subquery_with_limit --
-1 3
1 2
+1 3
2 4
-3 4
-3 3
20 2
22 3
24 4
+3 3
+3 4
-- !exists_subquery_with_order_and_limit --
-1 3
1 2
+1 3
2 4
-3 4
-3 3
20 2
22 3
24 4
+3 3
+3 4
diff --git a/regression-test/suites/correctness_p0/test_colocate_join.groovy
b/regression-test/suites/correctness_p0/test_colocate_join.groovy
index 45e4e57bd3..e8bd7206b8 100644
--- a/regression-test/suites/correctness_p0/test_colocate_join.groovy
+++ b/regression-test/suites/correctness_p0/test_colocate_join.groovy
@@ -161,7 +161,7 @@ suite("test_colocate_join") {
(20220101, 101, 202, 200, 100);"""
explain {
- sql("select " +
+ sql("select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ " +
" sum_col1,sum_col2 " +
"from " +
"(select datekey,sum(sum_col1) as sum_col1 from
test_query_colocate where datekey=20220101 group by datekey) t1 " +
diff --git
a/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy
b/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy
index dd1eb207b4..cfbef017db 100644
---
a/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy
+++
b/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy
@@ -59,4 +59,4 @@ PROPERTIES (
sql "drop table if exists ${table1}"
-}
\ No newline at end of file
+}
diff --git a/regression-test/suites/nereids_function_p0/gen_function/gen.groovy
b/regression-test/suites/nereids_function_p0/gen_function/gen.groovy
index 22b97ff220..4ca054ab25 100644
--- a/regression-test/suites/nereids_function_p0/gen_function/gen.groovy
+++ b/regression-test/suites/nereids_function_p0/gen_function/gen.groovy
@@ -49,4 +49,4 @@ suite("nereids_gen_fn") {
qt_sql_explode_split_outer_Varchar_Varchar_notnull '''
select id, e from fn_test lateral view explode_split_outer('a,
b, c, d', ',') lv as e order by id, e'''
-}
\ No newline at end of file
+}
diff --git a/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy
b/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy
index 8e6cc6e5c7..d5298d154e 100644
--- a/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy
+++ b/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy
@@ -121,7 +121,7 @@ suite("test_nereids_grouping_sets") {
rollup(k1_, k2) order by k1_, k2
"""
- qt_select3 "select 1 as k, k3, sum(k1) from groupingSetsTable group by
cube(k, k3) order by k, k3"
+ qt_select3 "select 1 as k, k3, sum(k1) as sum_k1 from groupingSetsTable
group by cube(k, k3) order by k, k3, sum_k1"
qt_select4 """
select k2, concat(k5, k6) as k_concat, sum(k1) from
groupingSetsTable group by
@@ -230,7 +230,7 @@ suite("test_nereids_grouping_sets") {
from
grouping_sum_table
) T
- ) T2;
+ ) T2 order by a;
"""
order_qt_select1 """
diff --git a/regression-test/suites/nereids_syntax_p0/set_operation.groovy
b/regression-test/suites/nereids_syntax_p0/set_operation.groovy
index 2e06f2aa78..c6838f9986 100644
--- a/regression-test/suites/nereids_syntax_p0/set_operation.groovy
+++ b/regression-test/suites/nereids_syntax_p0/set_operation.groovy
@@ -227,7 +227,7 @@ suite("test_nereids_set_operation") {
}
qt_union39 """(select k1 from setOperationTable order by k1) union all
(select k1 from setOperationTableNotNullable order by k1) order by k1;"""
- qt_union40 """
+ order_qt_union40 """
SELECT k1 FROM setOperationTable WHERE k2 = 2
INTERSECT
SELECT k1 FROM setOperationTable WHERE k1 = 1
@@ -235,7 +235,7 @@ suite("test_nereids_set_operation") {
SELECT k1 FROM setOperationTable WHERE k3 = 2
"""
- qt_union41 """
+ order_qt_union41 """
SELECT k1 FROM setOperationTable WHERE k2 = 1
EXCEPT
SELECT k1 FROM setOperationTable WHERE k3 = 2
@@ -245,7 +245,7 @@ suite("test_nereids_set_operation") {
SELECT k1 FROM setOperationTable WHERE k2 > 0)
"""
- qt_union42 """
+ order_qt_union42 """
SELECT k1 FROM setOperationTable WHERE k2 = 1
EXCEPT
SELECT k1 FROM setOperationTable WHERE k3 = 2
diff --git
a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
index 637e53223e..1d153405ea 100644
--- a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
+++ b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
@@ -236,35 +236,35 @@ suite ("sub_query_correlated") {
"""*/
//----------subquery with order----------
- qt_scalar_subquery_with_order """
+ order_qt_scalar_subquery_with_order """
select * from sub_query_correlated_subquery1 where
sub_query_correlated_subquery1.k1 > (select
sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3
where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2
order by a);
"""
- qt_in_subquery_with_order """
+ order_qt_in_subquery_with_order """
select * from sub_query_correlated_subquery1 where
sub_query_correlated_subquery1.k1 not in (select
sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where
sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by
k2);
"""
- qt_exists_subquery_with_order """
+ order_qt_exists_subquery_with_order """
select * from sub_query_correlated_subquery1 where exists (select
sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where
sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by
k2);
"""
//----------subquery with limit----------
- qt_scalar_subquery_with_limit """
+ order_qt_scalar_subquery_with_limit """
select * from sub_query_correlated_subquery1 where
sub_query_correlated_subquery1.k1 > (select
sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3
where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2
limit 1);
"""
//----------subquery with order and limit----------
- qt_scalar_subquery_with_order_and_limit """
+ order_qt_scalar_subquery_with_order_and_limit """
select * from sub_query_correlated_subquery1 where
sub_query_correlated_subquery1.k1 > (select
sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3
where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2
order by a limit 1);
"""
//---------subquery with Disjunctions-------------
- qt_scalar_subquery_with_disjunctions """
+ order_qt_scalar_subquery_with_disjunctions """
SELECT DISTINCT k1 FROM sub_query_correlated_subquery1 i1 WHERE
((SELECT count(*) FROM sub_query_correlated_subquery1 WHERE ((k1 = i1.k1) AND
(k2 = 2)) or ((k1 = i1.k1) AND (k2 = 1)) ) > 0);
"""
//--------subquery case when-----------
- qt_case_when_subquery """
+ order_qt_case_when_subquery """
SELECT CASE
WHEN (
SELECT COUNT(*) / 2
@@ -298,86 +298,86 @@ suite ("sub_query_correlated") {
SELECT * FROM sub_query_correlated_subquery1 WHERE EXISTS (SELECT k1
FROM sub_query_correlated_subquery3 WHERE k1 > 10) OR k1 < 10;
"""
- qt_hash_join_with_other_conjuncts1 """
+ order_qt_hash_join_with_other_conjuncts1 """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 >
sub_query_correlated_subquery3.k3) OR k1 < 10 ORDER BY k1;
"""
- qt_hash_join_with_other_conjuncts2 """
+ order_qt_hash_join_with_other_conjuncts2 """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 <
sub_query_correlated_subquery3.k3) OR k1 < 10 ORDER BY k1;
"""
- qt_hash_join_with_other_conjuncts3 """
+ order_qt_hash_join_with_other_conjuncts3 """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 >
sub_query_correlated_subquery3.k3) OR k1 < 11 ORDER BY k1;
"""
- qt_hash_join_with_other_conjuncts4 """
+ order_qt_hash_join_with_other_conjuncts4 """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 <
sub_query_correlated_subquery3.k3) OR k1 < 11 ORDER BY k1;
"""
- qt_same_subquery_in_conjuncts """
+ order_qt_same_subquery_in_conjuncts """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3) OR k1 IN (SELECT k1 FROM
sub_query_correlated_subquery3) OR k1 < 10 ORDER BY k1;
"""
- qt_two_subquery_in_one_conjuncts """
+ order_qt_two_subquery_in_one_conjuncts """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3) OR k1 IN (SELECT k3 FROM
sub_query_correlated_subquery3) OR k1 < 10 ORDER BY k1;
"""
- qt_multi_subquery_in_and_scalry """
+ order_qt_multi_subquery_in_and_scalry """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 =
sub_query_correlated_subquery3.k1)
OR k1 < (SELECT sum(k1)
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.v1)
OR k1 < 10 ORDER BY k1;
"""
- qt_multi_subquery_in_and_exist """
+ order_qt_multi_subquery_in_and_exist """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 =
sub_query_correlated_subquery3.k1)
OR exists (SELECT k1 FROM
sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.v1)
OR k1 < 10 ORDER BY k1;
"""
- qt_multi_subquery_in_and_exist_sum """
+ order_qt_multi_subquery_in_and_exist_sum """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 =
sub_query_correlated_subquery3.k1)
OR exists (SELECT sum(k1)
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.v1)
OR k1 < 10 ORDER BY k1;
"""
- qt_multi_subquery_in_and_in """
+ order_qt_multi_subquery_in_and_in """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 =
sub_query_correlated_subquery3.k1)
OR k2 in (SELECT k2 FROM
sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.v1)
OR k1 < 10 ORDER BY k1;
"""
- qt_multi_subquery_scalar_and_exist """
+ order_qt_multi_subquery_scalar_and_exist """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 < (SELECT
sum(k1) FROM sub_query_correlated_subquery3 where
sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1)
OR exists (SELECT sum(k1)
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.v1)
OR k1 < 10 ORDER BY k1;
"""
- qt_multi_subquery_scalar_and_scalar """
+ order_qt_multi_subquery_scalar_and_scalar """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 < (SELECT
sum(k1) FROM sub_query_correlated_subquery3 where
sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1)
OR k2 < (SELECT sum(k1)
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.v1)
OR k1 < 10 ORDER BY k1;
"""
- qt_multi_subquery_in_first_or_in_and_in """
+ order_qt_multi_subquery_in_first_or_in_and_in """
SELECT * FROM sub_query_correlated_subquery1 WHERE (k1 in (SELECT k2
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 =
sub_query_correlated_subquery3.k1)
or k2 in
(SELECT k1 FROM sub_query_correlated_subquery3 where
sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1))
and k1 in (SELECT
k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2
= sub_query_correlated_subquery3.k1)
"""
- qt_multi_subquery_in_second_or_in_and_in """
+ order_qt_multi_subquery_in_second_or_in_and_in """
SELECT * FROM sub_query_correlated_subquery1 WHERE k1 in (SELECT k2
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 =
sub_query_correlated_subquery3.k1)
or k2 in (SELECT k1
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 =
sub_query_correlated_subquery3.k1)
and k1 in (SELECT
k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2
= sub_query_correlated_subquery3.k1)
"""
- qt_multi_subquery_scalar_and_in_or_scalar_and_exists_agg """
+ order_qt_multi_subquery_scalar_and_in_or_scalar_and_exists_agg """
SELECT * FROM sub_query_correlated_subquery1 WHERE ((k1 != (SELECT
sum(k1) FROM sub_query_correlated_subquery3) and k1 = 1 OR k1 < 10) and k1 = 10
and k1 = 15)
and (k1 IN (SELECT k1 FROM
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.k1)
OR k1 < (SELECT sum(k1) FROM
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.k1))
and exists (SELECT sum(k1) FROM
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.k1);
"""
- qt_multi_subquery_scalar_and_in_or_scalar_and_exists """
+ order_qt_multi_subquery_scalar_and_in_or_scalar_and_exists """
SELECT * FROM sub_query_correlated_subquery1 WHERE ((k1 != (SELECT
sum(k1) FROM sub_query_correlated_subquery3) and k1 = 1 OR k1 < 10) and k1 = 10
and k1 = 15)
and (k1 IN (SELECT k1 FROM
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.k1)
OR k1 < (SELECT sum(k1) FROM
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 =
sub_query_correlated_subquery3.k1))
diff --git
a/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy
b/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy
index 4ee579f839..6d5bc11a5a 100644
---
a/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy
+++
b/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy
@@ -159,7 +159,7 @@ suite ("sub_query_diff_old_optimize") {
"""*/
//----------subquery with limit----------
- qt_exists_subquery_with_limit """
+ order_qt_exists_subquery_with_limit """
select * from sub_query_diff_old_optimize_subquery1 where exists
(select sub_query_diff_old_optimize_subquery3.k3 from
sub_query_diff_old_optimize_subquery3 where
sub_query_diff_old_optimize_subquery3.v2 =
sub_query_diff_old_optimize_subquery1.k2 limit 1);
"""
@@ -172,7 +172,7 @@ suite ("sub_query_diff_old_optimize") {
}
//----------subquery with order and limit-------
- qt_exists_subquery_with_order_and_limit """
+ order_qt_exists_subquery_with_order_and_limit """
select * from sub_query_diff_old_optimize_subquery1 where exists
(select sub_query_diff_old_optimize_subquery3.k3 from
sub_query_diff_old_optimize_subquery3 where
sub_query_diff_old_optimize_subquery3.v2 =
sub_query_diff_old_optimize_subquery1.k2 order by k1 limit 1);
"""
diff --git a/regression-test/suites/performance_p0/redundant_conjuncts.groovy
b/regression-test/suites/performance_p0/redundant_conjuncts.groovy
index 86035612c3..4027c02aaf 100644
--- a/regression-test/suites/performance_p0/redundant_conjuncts.groovy
+++ b/regression-test/suites/performance_p0/redundant_conjuncts.groovy
@@ -32,10 +32,10 @@ suite("redundant_conjuncts") {
"""
qt_redundant_conjuncts """
- EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2) */ v1
FROM redundant_conjuncts WHERE k1 = 1 AND k1 = 1;
+ EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2,
parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE
k1 = 1 AND k1 = 1;
"""
qt_redundant_conjuncts_gnerated_by_extract_common_filter """
- EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100) */ v1
FROM redundant_conjuncts WHERE k1 = 1 OR k1 = 2;
+ EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100,
parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE
k1 = 1 OR k1 = 2;
"""
}
diff --git
a/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
b/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
index eadab28672..87becfe020 100644
---
a/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
+++
b/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
@@ -73,15 +73,15 @@ suite("test_query_limit", "query,p0") {
qt_limit16 "select * from (select * from ${tableName} order by k1, k2, k3,
k4 limit 1, 2) a limit 2, 2"
qt_limit17 "select * from (select * from ${tableName} order by k1, k2, k3,
k4 limit 1, 2) a limit 2, 3"
test {
- sql "select * from ${tableName} limit 1, 10"
+ sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from
${tableName} limit 1, 10"
rowNum 2
}
test {
- sql "select * from ${tableName} limit 2, 10"
+ sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from
${tableName} limit 2, 10"
rowNum 1
}
test {
- sql "select * from ${tableName} limit 3, 10"
+ sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from
${tableName} limit 3, 10"
rowNum 0
}
}
diff --git
a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
index 0e517f1d7e..181f597849 100644
---
a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
+++
b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
@@ -123,7 +123,7 @@ suite("test_window_fn") {
select first_value(salary) over(order by enroll_date range between
unbounded preceding and UNBOUNDED following), last_value(salary) over(order by
enroll_date range between unbounded preceding and UNBOUNDED following), salary,
enroll_date from ${tbName1} order by salary, enroll_date;
"""
qt_sql """
- SELECT first_value(ten) OVER (PARTITION BY four ORDER BY ten), ten,
four FROM ${tbName2} WHERE unique2 < 10;
+ SELECT first_value(ten) OVER (PARTITION BY four ORDER BY ten), ten,
four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
"""
qt_sql """
SELECT first_value(unique1) over (order by four range between current
row and unbounded following),
@@ -190,7 +190,7 @@ suite("test_window_fn") {
"""
qt_sql """
SELECT depname, empno, salary, rank() OVER (PARTITION BY depname ORDER
BY salary, empno)
- FROM ${tbName1} ORDER BY rank() OVER (PARTITION BY depname ORDER BY
salary, empno);
+ FROM ${tbName1} ORDER BY rank() OVER (PARTITION BY depname ORDER BY
salary, empno), depname;
"""
qt_sql """
SELECT sum(salary) as s, row_number() OVER (ORDER BY depname) as r,
sum(sum(salary)) OVER (ORDER BY depname DESC) as ss
@@ -207,10 +207,10 @@ suite("test_window_fn") {
SELECT row_number() OVER (ORDER BY unique2) FROM ${tbName2} WHERE
unique2 < 10;
"""
qt_sql """
- SELECT rank() OVER (PARTITION BY four ORDER BY ten) AS rank_1, ten,
four FROM ${tbName2} WHERE unique2 < 10;
+ SELECT rank() OVER (PARTITION BY four ORDER BY ten) AS rank_1, ten,
four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
"""
qt_sql """
- SELECT dense_rank() OVER (PARTITION BY four ORDER BY ten), ten, four
FROM ${tbName2} WHERE unique2 < 10;
+ SELECT dense_rank() OVER (PARTITION BY four ORDER BY ten), ten, four
FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
"""
qt_sql """
select ten, sum(unique1) + sum(unique2) as res, rank() over (order
by sum(unique1) + sum(unique2)) as rank from ${tbName2} group by ten order by
ten;
@@ -256,7 +256,7 @@ suite("test_window_fn") {
SELECT count(1) OVER (PARTITION BY four) as c, four FROM (SELECT *
FROM ${tbName2} WHERE two = 1)s WHERE unique2 < 10 order by c, four;
"""
qt_sql """
- SELECT avg(four) OVER (PARTITION BY four ORDER BY thousand / 100) FROM
${tbName2} WHERE unique2 < 10;
+ SELECT avg(four) OVER (PARTITION BY four ORDER BY thousand / 100) FROM
${tbName2} WHERE unique2 < 10 order by four;
"""
qt_sql """
SELECT count(1) OVER (PARTITION BY four) FROM (SELECT * FROM
${tbName2} WHERE FALSE)s;
@@ -278,19 +278,19 @@ suite("test_window_fn") {
// lag
qt_sql """
- SELECT lag(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four
FROM ${tbName2} WHERE unique2 < 10;
+ SELECT lag(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four
FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
"""
// lead
qt_sql """
- SELECT lead(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten,
four FROM ${tbName2} WHERE unique2 < 10;
+ SELECT lead(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten,
four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
"""
qt_sql """
- SELECT lead(ten * 2, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten,
four FROM ${tbName2} WHERE unique2 < 10;
+ SELECT lead(ten * 2, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten,
four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
"""
qt_sql """
- SELECT lead(ten * 2, 1, -1) OVER (PARTITION BY four ORDER BY ten),
ten, four FROM ${tbName2} WHERE unique2 < 10;
+ SELECT lead(ten * 2, 1, -1) OVER (PARTITION BY four ORDER BY ten),
ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
"""
@@ -353,7 +353,7 @@ suite("test_window_fn") {
qt_sql_window_last_value """
select u_id, u_city, u_salary,
last_value(u_salary) over (partition by u_city order by u_id rows
between unbounded preceding and 1 preceding) last_value_test
- from example_window_tb;
+ from example_window_tb order by u_id;
"""
sql "DROP TABLE IF EXISTS example_window_tb;"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]