This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f0fad61db46 [pipelineX](bug) Fix file scan operator (#24989)
f0fad61db46 is described below

commit f0fad61db46dc45ee6f4e12f20276a654c9a3944
Author: Gabriel <[email protected]>
AuthorDate: Thu Sep 28 11:12:27 2023 +0800

    [pipelineX](bug) Fix file scan operator (#24989)
---
 be/src/pipeline/exec/file_scan_operator.cpp    | 61 +++++++++++++++++++++++++-
 be/src/pipeline/exec/file_scan_operator.h      | 11 +++--
 be/src/pipeline/exec/union_source_operator.cpp |  2 +-
 3 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index aa0eb100784..28fdb891988 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -31,7 +31,8 @@
 namespace doris::pipeline {
 
 Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
-    if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+    if (_scan_ranges.empty()) {
+        Base::_eos_dependency->set_ready_for_read();
         return Status::OK();
     }
 
@@ -50,4 +51,62 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     }
     return Status::OK();
 }
+
+void FileScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+    int max_scanners = config::doris_scanner_thread_pool_thread_num;
+    if (scan_ranges.size() <= max_scanners) {
+        _scan_ranges = scan_ranges;
+    } else {
+        // There is no need for the number of scanners to exceed the number of 
threads in thread pool.
+        _scan_ranges.clear();
+        auto range_iter = scan_ranges.begin();
+        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); 
++i, ++range_iter) {
+            _scan_ranges.push_back(*range_iter);
+        }
+        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
+            if (i == max_scanners) {
+                i = 0;
+            }
+            auto& ranges = 
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
+            auto& merged_ranges = 
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
+            ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
+        }
+        _scan_ranges.shrink_to_fit();
+        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << 
_scan_ranges.size();
+    }
+    if (scan_ranges.size() > 0 &&
+        
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
+        // for compatibility.
+        // in new implement, the tuple id is set in prepare phase
+        _output_tuple_id =
+                
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.dest_tuple_id;
+    }
+}
+
+Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::init(state, info));
+    auto& p = _parent->cast<FileScanOperatorX>();
+    _output_tuple_id = p._output_tuple_id;
+    return Status::OK();
+}
+
+Status FileScanLocalState::_process_conjuncts() {
+    RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts());
+    if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+        return Status::OK();
+    }
+    // TODO: Push conjuncts down to reader.
+    return Status::OK();
+}
+
+Status FileScanOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state));
+    if (state->get_query_ctx() != nullptr &&
+        state->get_query_ctx()->file_scan_range_params_map.count(_id) > 0) {
+        TFileScanRangeParams& params = 
state->get_query_ctx()->file_scan_range_params_map[_id];
+        _output_tuple_id = params.dest_tuple_id;
+    }
+    return Status::OK();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index e0c3fca78fb..63638872bfd 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -47,10 +47,11 @@ public:
     FileScanLocalState(RuntimeState* state, OperatorXBase* parent)
             : ScanLocalState<FileScanLocalState>(state, parent) {}
 
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+    Status _process_conjuncts() override;
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override {
-        _scan_ranges = scan_ranges;
-    }
+    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
     int parent_id() { return _parent->id(); }
 
 private:
@@ -61,6 +62,7 @@ private:
     // 2. parquet file meta
     // KVCache<std::string> _kv_cache;
     std::unique_ptr<vectorized::ShardedKVCache> _kv_cache;
+    TupleId _output_tuple_id = -1;
 };
 
 class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
@@ -68,9 +70,10 @@ public:
     FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
             : ScanOperatorX<FileScanLocalState>(pool, tnode, descs) {
         _output_tuple_id = tnode.file_scan_node.tuple_id;
-        _id = tnode.node_id;
     }
 
+    Status prepare(RuntimeState* state) override;
+
 private:
     friend class FileScanLocalState;
 };
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 028ef678f00..f12678002ae 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -147,7 +147,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* b
             return Status::OK();
         }
         block->swap(*output_block);
-        output_block->clear_column_data(row_desc().num_materialized_slots());
+        
output_block->clear_column_data(_row_descriptor.num_materialized_slots());
         
local_state._shared_state->data_queue->push_free_block(std::move(output_block), 
child_idx);
     }
     local_state.reached_limit(block, source_state);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to