This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new e9f9ab8a3a8 [opt](vfilescanner) interrupt running parquet/orc readers 
when scannode is finished (#28264)
e9f9ab8a3a8 is described below

commit e9f9ab8a3a8047b979a2fddf1b8d382c1de019d9
Author: Ashin Gau <[email protected]>
AuthorDate: Thu Dec 14 23:23:08 2023 +0800

    [opt](vfilescanner) interrupt running parquet/orc readers when scannode is 
finished (#28264)
    
    `VScanNode::get_next` will check whether the `ScanNode` has reached `limit` 
condition, and send `eos` to `TaskScheduler`, and `TaskScheduler` will try to 
close `ScanNode`.
    However, `ScanNode` must wait all running scanners finished, so even if 
`ScanNode` has reached `limit` condition, it can't be closed immediately.
    This PR try to interrupt the running readers, and make `ScanNode` to end as 
soon as possible.
---
 be/src/io/io_common.h                              |  3 ++
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 35 ++++++++++++++++++++--
 .../exec/format/parquet/vparquet_page_reader.cpp   |  6 ++++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 22 +++++++++++---
 be/src/vec/exec/scan/new_file_scan_node.cpp        |  4 +++
 be/src/vec/exec/scan/scanner_context.cpp           |  1 +
 be/src/vec/exec/scan/scanner_context.h             |  7 +++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |  9 +++++-
 be/src/vec/exec/scan/vfile_scanner.h               |  2 ++
 be/src/vec/exec/scan/vscanner.cpp                  |  2 +-
 be/src/vec/exec/scan/vscanner.h                    |  6 ++++
 11 files changed, 88 insertions(+), 9 deletions(-)

diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 8849330746a..5e6cb026ea8 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -56,11 +56,14 @@ public:
             : query_id(query_id),
               is_disposable(use_disposable_cache),
               read_segment_index(read_segment_index),
+              should_stop(false),
               file_cache_stats(stats) {}
     ReaderType reader_type = ReaderType::UNKNOWN;
     const TUniqueId* query_id = nullptr;
     bool is_disposable = false;
     bool read_segment_index = false;
+    // stop reader when reading, used in some interrupted operations
+    bool should_stop = false;
     FileCacheStatistics* file_cache_stats = nullptr;
 };
 
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index ff8759fd1e7..8076260cff4 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -114,6 +114,9 @@ void ORCFileInputStream::read(void* buf, uint64_t length, 
uint64_t offset) {
     uint64_t has_read = 0;
     char* out = reinterpret_cast<char*>(buf);
     while (has_read < length) {
+        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+            throw orc::ParseError("stop");
+        }
         size_t loop_read;
         Slice result(out + has_read, length - has_read);
         Status st = _file_reader->read_at(offset + has_read, result, 
&loop_read, _io_ctx);
@@ -242,6 +245,9 @@ Status OrcReader::_create_file_reader() {
         // invoker maybe just skip Status.NotFound and continue
         // so we need distinguish between it and other kinds of errors
         std::string _err_msg = e.what();
+        if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
+            return Status::EndOfFile("stop");
+        }
         if (_err_msg.find("No such file or directory") != std::string::npos) {
             return Status::NotFound(_err_msg);
         }
@@ -805,7 +811,11 @@ Status OrcReader::set_fill_columns(
         _remaining_rows = _row_reader->getNumberOfRows();
 
     } catch (std::exception& e) {
-        return Status::InternalError("Failed to create orc row reader. reason 
= {}", e.what());
+        std::string _err_msg = e.what();
+        // ignore stop exception
+        if (!(_io_ctx && _io_ctx->should_stop && _err_msg == "stop")) {
+            return Status::InternalError("Failed to create orc row reader. 
reason = {}", _err_msg);
+        }
     }
 
     if (!_slot_id_to_filter_conjuncts) {
@@ -1384,6 +1394,11 @@ std::string OrcReader::_get_field_name_lower_case(const 
orc::Type* orc_type, int
 }
 
 Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    if (_io_ctx && _io_ctx->should_stop) {
+        *eof = true;
+        *read_rows = 0;
+        return Status::OK();
+    }
     if (_push_down_agg_type == TPushAggOp::type::COUNT) {
         auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);
 
@@ -1421,8 +1436,15 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                     return Status::OK();
                 }
             } catch (std::exception& e) {
+                std::string _err_msg = e.what();
+                if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
+                    block->clear_column_data();
+                    *eof = true;
+                    *read_rows = 0;
+                    return Status::OK();
+                }
                 return Status::InternalError("Orc row reader nextBatch failed. 
reason = {}",
-                                             e.what());
+                                             _err_msg);
             }
         }
 
@@ -1480,8 +1502,15 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                     return Status::OK();
                 }
             } catch (std::exception& e) {
+                std::string _err_msg = e.what();
+                if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
+                    block->clear_column_data();
+                    *eof = true;
+                    *read_rows = 0;
+                    return Status::OK();
+                }
                 return Status::InternalError("Orc row reader nextBatch failed. 
reason = {}",
-                                             e.what());
+                                             _err_msg);
             }
         }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 47be7d8bcc9..7ca53590d6c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -62,6 +62,9 @@ Status PageReader::next_page_header() {
     const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 
20;
     uint32_t real_header_size = 0;
     while (true) {
+        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+            return Status::EndOfFile("stop");
+        }
         header_size = std::min(header_size, max_size);
         RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, 
header_size, _io_ctx));
         real_header_size = header_size;
@@ -99,6 +102,9 @@ Status PageReader::get_page_data(Slice& slice) {
     if (UNLIKELY(_state != HEADER_PARSED)) {
         return Status::IOError("Should generate page header first to load 
current page data");
     }
+    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+        return Status::EndOfFile("stop");
+    }
     slice.size = _cur_page_header.compressed_page_size;
     RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
     _offset += slice.size;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 53655a187e7..ccdbee91353 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -209,6 +209,9 @@ void ParquetReader::_close_internal() {
 }
 
 Status ParquetReader::_open_file() {
+    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+        return Status::EndOfFile("stop");
+    }
     if (_file_reader == nullptr) {
         SCOPED_RAW_TIMER(&_statistics.open_file_time);
         ++_statistics.open_file_num;
@@ -507,9 +510,11 @@ Status 
ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
 
 Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
     if (_current_group_reader == nullptr || _row_group_eof) {
-        if (_read_row_groups.size() > 0) {
-            RETURN_IF_ERROR(_next_row_group_reader());
-        } else {
+        Status st = _next_row_group_reader();
+        if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>()) {
+            return st;
+        }
+        if (_current_group_reader == nullptr || _row_group_eof || 
st.is<ErrorCode::END_OF_FILE>()) {
             _current_group_reader.reset(nullptr);
             _row_group_eof = true;
             *read_rows = 0;
@@ -517,7 +522,6 @@ Status ParquetReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof)
             return Status::OK();
         }
     }
-    DCHECK(_current_group_reader != nullptr);
     if (_push_down_agg_type == TPushAggOp::type::COUNT) {
         auto rows = std::min(_current_group_reader->get_remaining_rows(), 
(int64_t)_batch_size);
 
@@ -540,6 +544,13 @@ Status ParquetReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof)
         SCOPED_RAW_TIMER(&_statistics.column_read_time);
         Status batch_st =
                 _current_group_reader->next_batch(block, _batch_size, 
read_rows, &_row_group_eof);
+        if (batch_st.is<ErrorCode::END_OF_FILE>()) {
+            block->clear_column_data();
+            _current_group_reader.reset(nullptr);
+            *read_rows = 0;
+            *eof = true;
+            return Status::OK();
+        }
         if (!batch_st.ok()) {
             return Status::InternalError("Read parquet file {} failed, reason 
= {}",
                                          _scan_range.path, 
batch_st.to_string());
@@ -736,6 +747,9 @@ bool ParquetReader::_has_page_index(const 
std::vector<tparquet::ColumnChunk>& co
 
 Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
                                           std::vector<RowRange>& 
candidate_row_ranges) {
+    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+        return Status::EndOfFile("stop");
+    }
     SCOPED_RAW_TIMER(&_statistics.page_index_filter_time);
 
     std::function<void()> read_whole_row_group = [&]() {
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp 
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 43f43722e9c..da33538b8c3 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -63,6 +63,10 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
     int max_scanners =
             config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
     max_scanners = max_scanners == 0 ? 1 : max_scanners;
+    // For select * from table limit 10; should just use one thread.
+    if (should_run_serial()) {
+        max_scanners = 1;
+    }
     if (scan_ranges.size() <= max_scanners) {
         _scan_ranges = scan_ranges;
     } else {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 27ed7509987..3421c914b6b 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -55,6 +55,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, 
doris::vectorized::V
           _max_bytes_in_queue(max_bytes_in_blocks_queue_),
           _scanner_scheduler(state_->exec_env()->scanner_scheduler()),
           _scanners(scanners_),
+          _scanners_ref(scanners_.begin(), scanners_.end()),
           _num_parallel_instances(num_parallel_instances) {
     ctx_id = UniqueId::gen_uid().to_string();
     if (_scanners.empty()) {
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index c2c8612f861..9fb24651ce8 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -102,6 +102,11 @@ public:
     void set_should_stop() {
         std::lock_guard l(_transfer_lock);
         _should_stop = true;
+        for (const VScannerWPtr& scanner : _scanners_ref) {
+            if (VScannerSPtr sc = scanner.lock()) {
+                sc->try_stop();
+            }
+        }
         _blocks_queue_added_cv.notify_one();
     }
 
@@ -244,6 +249,8 @@ protected:
     // Not need to protect by lock, because only one scheduler thread will 
access to it.
     doris::Mutex _scanners_lock;
     std::list<VScannerSPtr> _scanners;
+    // weak pointer for _scanners, used in stop function
+    std::vector<VScannerWPtr> _scanners_ref;
     std::vector<int64_t> _finished_scanner_runtime;
     std::vector<int64_t> _finished_scanner_rows_read;
     std::vector<int64_t> _finished_scanner_wait_worker_time;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 5924aa3963f..72ae8e2d73f 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -658,7 +658,7 @@ Status VFileScanner::_get_next_reader() {
         }
         _cur_reader.reset(nullptr);
         _src_block_init = false;
-        if (_next_range >= _ranges.size()) {
+        if (_next_range >= _ranges.size() || _should_stop) {
             _scanner_eof = true;
             _state->update_num_finished_scan_range(1);
             return Status::OK();
@@ -1044,4 +1044,11 @@ Status VFileScanner::close(RuntimeState* state) {
     return Status::OK();
 }
 
+void VFileScanner::try_stop() {
+    VScanner::try_stop();
+    if (_io_ctx) {
+        _io_ctx->should_stop = true;
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index e3533ce05c2..58355cdbe36 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -71,6 +71,8 @@ public:
 
     Status close(RuntimeState* state) override;
 
+    void try_stop() override;
+
     Status prepare(const VExprContextSPtrs& conjuncts,
                    std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
                    const std::unordered_map<std::string, int>* 
colname_to_slot_id);
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 52cf88f260a..2264eab9aa4 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -90,7 +90,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, 
bool* eof) {
             }
             // record rows return (after filter) for _limit check
             _num_rows_return += block->rows();
-        } while (!state->is_cancelled() && block->rows() == 0 && !(*eof) &&
+        } while (!_should_stop && !state->is_cancelled() && block->rows() == 0 
&& !(*eof) &&
                  _num_rows_read < rows_read_threshold);
     }
 
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index c29aafafc04..3dd57bedb51 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -64,6 +64,9 @@ public:
 
     virtual Status close(RuntimeState* state);
 
+    // Try to stop scanner, and all running readers.
+    virtual void try_stop() { _should_stop = true; };
+
     virtual std::string get_name() { return ""; }
 
     // return the readable name of current scan range.
@@ -212,8 +215,11 @@ protected:
 
     ScannerCounter _counter;
     int64_t _per_scanner_timer = 0;
+
+    bool _should_stop = false;
 };
 
 using VScannerSPtr = std::shared_ptr<VScanner>;
+using VScannerWPtr = std::weak_ptr<VScanner>;
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to