This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f0fad61db46 [pipelineX](bug) Fix file scan operator (#24989)
f0fad61db46 is described below
commit f0fad61db46dc45ee6f4e12f20276a654c9a3944
Author: Gabriel <[email protected]>
AuthorDate: Thu Sep 28 11:12:27 2023 +0800
[pipelineX](bug) Fix file scan operator (#24989)
---
be/src/pipeline/exec/file_scan_operator.cpp | 61 +++++++++++++++++++++++++-
be/src/pipeline/exec/file_scan_operator.h | 11 +++--
be/src/pipeline/exec/union_source_operator.cpp | 2 +-
3 files changed, 68 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index aa0eb100784..28fdb891988 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -31,7 +31,8 @@
namespace doris::pipeline {
Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
- if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+ if (_scan_ranges.empty()) {
+ Base::_eos_dependency->set_ready_for_read();
return Status::OK();
}
@@ -50,4 +51,62 @@ Status
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}
return Status::OK();
}
+
+void FileScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+ int max_scanners = config::doris_scanner_thread_pool_thread_num;
+ if (scan_ranges.size() <= max_scanners) {
+ _scan_ranges = scan_ranges;
+ } else {
+ // There is no need for the number of scanners to exceed the number of
threads in thread pool.
+ _scan_ranges.clear();
+ auto range_iter = scan_ranges.begin();
+ for (int i = 0; i < max_scanners && range_iter != scan_ranges.end();
++i, ++range_iter) {
+ _scan_ranges.push_back(*range_iter);
+ }
+ for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
+ if (i == max_scanners) {
+ i = 0;
+ }
+ auto& ranges =
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
+ auto& merged_ranges =
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
+ }
+ _scan_ranges.shrink_to_fit();
+ LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " <<
_scan_ranges.size();
+ }
+ if (scan_ranges.size() > 0 &&
+
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
+ // for compatibility.
+ // in new implement, the tuple id is set in prepare phase
+ _output_tuple_id =
+
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.dest_tuple_id;
+ }
+}
+
+Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::init(state, info));
+ auto& p = _parent->cast<FileScanOperatorX>();
+ _output_tuple_id = p._output_tuple_id;
+ return Status::OK();
+}
+
+Status FileScanLocalState::_process_conjuncts() {
+ RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts());
+ if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+ return Status::OK();
+ }
+ // TODO: Push conjuncts down to reader.
+ return Status::OK();
+}
+
+Status FileScanOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state));
+ if (state->get_query_ctx() != nullptr &&
+ state->get_query_ctx()->file_scan_range_params_map.count(_id) > 0) {
+ TFileScanRangeParams& params =
state->get_query_ctx()->file_scan_range_params_map[_id];
+ _output_tuple_id = params.dest_tuple_id;
+ }
+ return Status::OK();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/file_scan_operator.h
b/be/src/pipeline/exec/file_scan_operator.h
index e0c3fca78fb..63638872bfd 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -47,10 +47,11 @@ public:
FileScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalState<FileScanLocalState>(state, parent) {}
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+ Status _process_conjuncts() override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners)
override;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override {
- _scan_ranges = scan_ranges;
- }
+ void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
int parent_id() { return _parent->id(); }
private:
@@ -61,6 +62,7 @@ private:
// 2. parquet file meta
// KVCache<std::string> _kv_cache;
std::unique_ptr<vectorized::ShardedKVCache> _kv_cache;
+ TupleId _output_tuple_id = -1;
};
class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
@@ -68,9 +70,10 @@ public:
FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
: ScanOperatorX<FileScanLocalState>(pool, tnode, descs) {
_output_tuple_id = tnode.file_scan_node.tuple_id;
- _id = tnode.node_id;
}
+ Status prepare(RuntimeState* state) override;
+
private:
friend class FileScanLocalState;
};
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index 028ef678f00..f12678002ae 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -147,7 +147,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* b
return Status::OK();
}
block->swap(*output_block);
- output_block->clear_column_data(row_desc().num_materialized_slots());
+
output_block->clear_column_data(_row_descriptor.num_materialized_slots());
local_state._shared_state->data_queue->push_free_block(std::move(output_block),
child_idx);
}
local_state.reached_limit(block, source_state);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]