This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b9572f9de03 [pipelineX](fix) Fix pip scanner context bug (#29229)
b9572f9de03 is described below
commit b9572f9de038c29aa0e763f4ebcdc1395183ec55
Author: Gabriel <[email protected]>
AuthorDate: Fri Dec 29 13:24:39 2023 +0800
[pipelineX](fix) Fix pip scanner context bug (#29229)
---
be/src/pipeline/exec/scan_operator.cpp | 5 +-
be/src/vec/exec/scan/pip_scanner_context.h | 118 +++++++++++++++++++++--------
be/src/vec/exec/scan/scanner_context.cpp | 3 +
be/src/vec/exec/scan/scanner_context.h | 2 +-
4 files changed, 92 insertions(+), 36 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 0c0ccd42410..de3214b0465 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1239,10 +1239,9 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<vectorized::VScannerSPtr>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
- _scanner_ctx = PipScannerContext::create_shared(
+ _scanner_ctx = PipXScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
- state()->scan_queue_mem_limit(), p._col_distribute_ids, 1,
_scan_dependency,
- _finish_dependency);
+ state()->scan_queue_mem_limit(), _scan_dependency,
_finish_dependency);
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index fbf59fffab2..9a717ec08b2 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -21,11 +21,9 @@
#include "runtime/descriptors.h"
#include "scanner_context.h"
-namespace doris {
+namespace doris::pipeline {
-namespace pipeline {
-
-class PipScannerContext : public vectorized::ScannerContext {
+class PipScannerContext final : public vectorized::ScannerContext {
ENABLE_FACTORY_CREATOR(PipScannerContext);
public:
@@ -41,19 +39,6 @@ public:
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}
- PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
- const TupleDescriptor* output_tuple_desc,
- const RowDescriptor* output_row_descriptor,
- const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
- int64_t max_bytes_in_blocks_queue, const
std::vector<int>& col_distribute_ids,
- const int num_parallel_instances,
- std::shared_ptr<pipeline::ScanDependency> dependency,
- std::shared_ptr<pipeline::Dependency> finish_dependency)
- : vectorized::ScannerContext(state, output_tuple_desc,
output_row_descriptor, scanners,
- limit_, max_bytes_in_blocks_queue,
num_parallel_instances,
- local_state, dependency,
finish_dependency),
- _need_colocate_distribute(false) {}
-
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
int id, bool wait = false) override {
{
@@ -95,9 +80,6 @@ public:
if (_blocks_queues[id].empty()) {
this->reschedule_scanner_ctx();
- if (_dependency) {
- _dependency->block();
- }
}
}
@@ -180,9 +162,6 @@ public:
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
}
- if (_dependency) {
- _dependency->set_ready();
- }
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
}
@@ -232,9 +211,6 @@ public:
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
_colocate_mutable_blocks[i]->clear();
}
- if (_dependency) {
- _dependency->set_ready();
- }
}
}
}
@@ -248,7 +224,7 @@ public:
return res;
}
-private:
+protected:
int _next_queue_to_feed = 0;
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
@@ -286,9 +262,6 @@ private:
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
- if (_dependency) {
- _dependency->set_ready();
- }
_colocate_blocks[loc] = get_free_block();
_colocate_mutable_blocks[loc]->set_mutable_columns(
_colocate_blocks[loc]->mutate_columns());
@@ -297,5 +270,86 @@ private:
}
};
-} // namespace pipeline
-} // namespace doris
+class PipXScannerContext final : public vectorized::ScannerContext {
+ ENABLE_FACTORY_CREATOR(PipXScannerContext);
+
+public:
+ PipXScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
+ const TupleDescriptor* output_tuple_desc,
+ const RowDescriptor* output_row_descriptor,
+ const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
+ int64_t max_bytes_in_blocks_queue,
+ std::shared_ptr<pipeline::ScanDependency> dependency,
+ std::shared_ptr<pipeline::Dependency> finish_dependency)
+ : vectorized::ScannerContext(state, output_tuple_desc,
output_row_descriptor, scanners,
+ limit_, max_bytes_in_blocks_queue, 1,
local_state,
+ dependency, finish_dependency) {}
+ Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
+ int id, bool wait = false) override {
+ std::unique_lock l(_transfer_lock);
+ if (state->is_cancelled()) {
+ set_status_on_error(Status::Cancelled("cancelled"), false);
+ }
+
+ if (!status().ok()) {
+ return _process_status;
+ }
+
+ std::vector<vectorized::BlockUPtr> merge_blocks;
+ if (_blocks_queue.empty()) {
+ *eos = done();
+ return Status::OK();
+ }
+ if (_process_status.is<ErrorCode::CANCELLED>()) {
+ *eos = true;
+ return Status::OK();
+ }
+ *block = std::move(_blocks_queue.front());
+ _blocks_queue.pop_front();
+
+ auto rows = (*block)->rows();
+ while (!_blocks_queue.empty()) {
+ const auto add_rows = (*_blocks_queue.front()).rows();
+ if (rows + add_rows < state->batch_size()) {
+ rows += add_rows;
+ merge_blocks.emplace_back(std::move(_blocks_queue.front()));
+ _blocks_queue.pop_front();
+ } else {
+ break;
+ }
+ }
+
+ if (_blocks_queue.empty()) {
+ this->reschedule_scanner_ctx();
+ _dependency->block();
+ }
+
+ _cur_bytes_in_queue -= (*block)->allocated_bytes();
+ if (!merge_blocks.empty()) {
+ vectorized::MutableBlock m(block->get());
+ for (auto& merge_block : merge_blocks) {
+ _cur_bytes_in_queue -= merge_block->allocated_bytes();
+ static_cast<void>(m.merge(*merge_block));
+ return_free_block(std::move(merge_block));
+ }
+ (*block)->set_columns(std::move(m.mutable_columns()));
+ }
+
+ return Status::OK();
+ }
+
+ void reschedule_scanner_ctx() override {
+ if (done()) {
+ return;
+ }
+ auto state = _scanner_scheduler->submit(shared_from_this());
+ //todo(wb) rethinking is it better to mark current scan_context failed
when submit failed many times?
+ if (state.ok()) {
+ _num_scheduling_ctx++;
+ } else {
+ set_status_on_error(state, false);
+ }
+ }
+};
+
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 954c294574f..16bb1ce8487 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -259,6 +259,9 @@ void
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
_blocks_queue.push_back(std::move(b));
}
blocks.clear();
+ if (_dependency) {
+ _dependency->set_ready();
+ }
_blocks_queue_added_cv.notify_one();
_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
}
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 6a3e8553f8f..035d396bf65 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -170,7 +170,7 @@ public:
SimplifiedScanScheduler* get_simple_scan_scheduler() { return
_simple_scan_scheduler; }
- void reschedule_scanner_ctx();
+ virtual void reschedule_scanner_ctx();
// the unique id of this context
std::string ctx_id;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]