This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e9f9ab8a3a8 [opt](vfilescanner) interrupt running parquet/orc readers
when scannode is finished (#28264)
e9f9ab8a3a8 is described below
commit e9f9ab8a3a8047b979a2fddf1b8d382c1de019d9
Author: Ashin Gau <[email protected]>
AuthorDate: Thu Dec 14 23:23:08 2023 +0800
[opt](vfilescanner) interrupt running parquet/orc readers when scannode is
finished (#28264)
`VScanNode::get_next` will check whether the `ScanNode` has reached `limit`
condition, and send `eos` to `TaskScheduler`, and `TaskScheduler` will try to
close `ScanNode`.
However, `ScanNode` must wait all running scanners finished, so even if
`ScanNode` has reached `limit` condition, it can't be closed immediately.
This PR try to interrupt the running readers, and make `ScanNode` to end as
soon as possible.
---
be/src/io/io_common.h | 3 ++
be/src/vec/exec/format/orc/vorc_reader.cpp | 35 ++++++++++++++++++++--
.../exec/format/parquet/vparquet_page_reader.cpp | 6 ++++
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 22 +++++++++++---
be/src/vec/exec/scan/new_file_scan_node.cpp | 4 +++
be/src/vec/exec/scan/scanner_context.cpp | 1 +
be/src/vec/exec/scan/scanner_context.h | 7 +++++
be/src/vec/exec/scan/vfile_scanner.cpp | 9 +++++-
be/src/vec/exec/scan/vfile_scanner.h | 2 ++
be/src/vec/exec/scan/vscanner.cpp | 2 +-
be/src/vec/exec/scan/vscanner.h | 6 ++++
11 files changed, 88 insertions(+), 9 deletions(-)
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 8849330746a..5e6cb026ea8 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -56,11 +56,14 @@ public:
: query_id(query_id),
is_disposable(use_disposable_cache),
read_segment_index(read_segment_index),
+ should_stop(false),
file_cache_stats(stats) {}
ReaderType reader_type = ReaderType::UNKNOWN;
const TUniqueId* query_id = nullptr;
bool is_disposable = false;
bool read_segment_index = false;
+ // stop reader when reading, used in some interrupted operations
+ bool should_stop = false;
FileCacheStatistics* file_cache_stats = nullptr;
};
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index ff8759fd1e7..8076260cff4 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -114,6 +114,9 @@ void ORCFileInputStream::read(void* buf, uint64_t length,
uint64_t offset) {
uint64_t has_read = 0;
char* out = reinterpret_cast<char*>(buf);
while (has_read < length) {
+ if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+ throw orc::ParseError("stop");
+ }
size_t loop_read;
Slice result(out + has_read, length - has_read);
Status st = _file_reader->read_at(offset + has_read, result,
&loop_read, _io_ctx);
@@ -242,6 +245,9 @@ Status OrcReader::_create_file_reader() {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
std::string _err_msg = e.what();
+ if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
+ return Status::EndOfFile("stop");
+ }
if (_err_msg.find("No such file or directory") != std::string::npos) {
return Status::NotFound(_err_msg);
}
@@ -805,7 +811,11 @@ Status OrcReader::set_fill_columns(
_remaining_rows = _row_reader->getNumberOfRows();
} catch (std::exception& e) {
- return Status::InternalError("Failed to create orc row reader. reason
= {}", e.what());
+ std::string _err_msg = e.what();
+ // ignore stop exception
+ if (!(_io_ctx && _io_ctx->should_stop && _err_msg == "stop")) {
+ return Status::InternalError("Failed to create orc row reader.
reason = {}", _err_msg);
+ }
}
if (!_slot_id_to_filter_conjuncts) {
@@ -1384,6 +1394,11 @@ std::string OrcReader::_get_field_name_lower_case(const
orc::Type* orc_type, int
}
Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+ if (_io_ctx && _io_ctx->should_stop) {
+ *eof = true;
+ *read_rows = 0;
+ return Status::OK();
+ }
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);
@@ -1421,8 +1436,15 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
return Status::OK();
}
} catch (std::exception& e) {
+ std::string _err_msg = e.what();
+ if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
+ block->clear_column_data();
+ *eof = true;
+ *read_rows = 0;
+ return Status::OK();
+ }
return Status::InternalError("Orc row reader nextBatch failed.
reason = {}",
- e.what());
+ _err_msg);
}
}
@@ -1480,8 +1502,15 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
return Status::OK();
}
} catch (std::exception& e) {
+ std::string _err_msg = e.what();
+ if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
+ block->clear_column_data();
+ *eof = true;
+ *read_rows = 0;
+ return Status::OK();
+ }
return Status::InternalError("Orc row reader nextBatch failed.
reason = {}",
- e.what());
+ _err_msg);
}
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 47be7d8bcc9..7ca53590d6c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -62,6 +62,9 @@ Status PageReader::next_page_header() {
const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb <<
20;
uint32_t real_header_size = 0;
while (true) {
+ if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+ return Status::EndOfFile("stop");
+ }
header_size = std::min(header_size, max_size);
RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset,
header_size, _io_ctx));
real_header_size = header_size;
@@ -99,6 +102,9 @@ Status PageReader::get_page_data(Slice& slice) {
if (UNLIKELY(_state != HEADER_PARSED)) {
return Status::IOError("Should generate page header first to load
current page data");
}
+ if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+ return Status::EndOfFile("stop");
+ }
slice.size = _cur_page_header.compressed_page_size;
RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
_offset += slice.size;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 53655a187e7..ccdbee91353 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -209,6 +209,9 @@ void ParquetReader::_close_internal() {
}
Status ParquetReader::_open_file() {
+ if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+ return Status::EndOfFile("stop");
+ }
if (_file_reader == nullptr) {
SCOPED_RAW_TIMER(&_statistics.open_file_time);
++_statistics.open_file_num;
@@ -507,9 +510,11 @@ Status
ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
if (_current_group_reader == nullptr || _row_group_eof) {
- if (_read_row_groups.size() > 0) {
- RETURN_IF_ERROR(_next_row_group_reader());
- } else {
+ Status st = _next_row_group_reader();
+ if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>()) {
+ return st;
+ }
+ if (_current_group_reader == nullptr || _row_group_eof ||
st.is<ErrorCode::END_OF_FILE>()) {
_current_group_reader.reset(nullptr);
_row_group_eof = true;
*read_rows = 0;
@@ -517,7 +522,6 @@ Status ParquetReader::get_next_block(Block* block, size_t*
read_rows, bool* eof)
return Status::OK();
}
}
- DCHECK(_current_group_reader != nullptr);
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
auto rows = std::min(_current_group_reader->get_remaining_rows(),
(int64_t)_batch_size);
@@ -540,6 +544,13 @@ Status ParquetReader::get_next_block(Block* block, size_t*
read_rows, bool* eof)
SCOPED_RAW_TIMER(&_statistics.column_read_time);
Status batch_st =
_current_group_reader->next_batch(block, _batch_size,
read_rows, &_row_group_eof);
+ if (batch_st.is<ErrorCode::END_OF_FILE>()) {
+ block->clear_column_data();
+ _current_group_reader.reset(nullptr);
+ *read_rows = 0;
+ *eof = true;
+ return Status::OK();
+ }
if (!batch_st.ok()) {
return Status::InternalError("Read parquet file {} failed, reason
= {}",
_scan_range.path,
batch_st.to_string());
@@ -736,6 +747,9 @@ bool ParquetReader::_has_page_index(const
std::vector<tparquet::ColumnChunk>& co
Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
std::vector<RowRange>&
candidate_row_ranges) {
+ if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+ return Status::EndOfFile("stop");
+ }
SCOPED_RAW_TIMER(&_statistics.page_index_filter_time);
std::function<void()> read_whole_row_group = [&]() {
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 43f43722e9c..da33538b8c3 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -63,6 +63,10 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
int max_scanners =
config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
max_scanners = max_scanners == 0 ? 1 : max_scanners;
+ // For select * from table limit 10; should just use one thread.
+ if (should_run_serial()) {
+ max_scanners = 1;
+ }
if (scan_ranges.size() <= max_scanners) {
_scan_ranges = scan_ranges;
} else {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 27ed7509987..3421c914b6b 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -55,6 +55,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_,
doris::vectorized::V
_max_bytes_in_queue(max_bytes_in_blocks_queue_),
_scanner_scheduler(state_->exec_env()->scanner_scheduler()),
_scanners(scanners_),
+ _scanners_ref(scanners_.begin(), scanners_.end()),
_num_parallel_instances(num_parallel_instances) {
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index c2c8612f861..9fb24651ce8 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -102,6 +102,11 @@ public:
void set_should_stop() {
std::lock_guard l(_transfer_lock);
_should_stop = true;
+ for (const VScannerWPtr& scanner : _scanners_ref) {
+ if (VScannerSPtr sc = scanner.lock()) {
+ sc->try_stop();
+ }
+ }
_blocks_queue_added_cv.notify_one();
}
@@ -244,6 +249,8 @@ protected:
// Not need to protect by lock, because only one scheduler thread will
access to it.
doris::Mutex _scanners_lock;
std::list<VScannerSPtr> _scanners;
+ // weak pointer for _scanners, used in stop function
+ std::vector<VScannerWPtr> _scanners_ref;
std::vector<int64_t> _finished_scanner_runtime;
std::vector<int64_t> _finished_scanner_rows_read;
std::vector<int64_t> _finished_scanner_wait_worker_time;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 5924aa3963f..72ae8e2d73f 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -658,7 +658,7 @@ Status VFileScanner::_get_next_reader() {
}
_cur_reader.reset(nullptr);
_src_block_init = false;
- if (_next_range >= _ranges.size()) {
+ if (_next_range >= _ranges.size() || _should_stop) {
_scanner_eof = true;
_state->update_num_finished_scan_range(1);
return Status::OK();
@@ -1044,4 +1044,11 @@ Status VFileScanner::close(RuntimeState* state) {
return Status::OK();
}
+void VFileScanner::try_stop() {
+ VScanner::try_stop();
+ if (_io_ctx) {
+ _io_ctx->should_stop = true;
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index e3533ce05c2..58355cdbe36 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -71,6 +71,8 @@ public:
Status close(RuntimeState* state) override;
+ void try_stop() override;
+
Status prepare(const VExprContextSPtrs& conjuncts,
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const std::unordered_map<std::string, int>*
colname_to_slot_id);
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 52cf88f260a..2264eab9aa4 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -90,7 +90,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block,
bool* eof) {
}
// record rows return (after filter) for _limit check
_num_rows_return += block->rows();
- } while (!state->is_cancelled() && block->rows() == 0 && !(*eof) &&
+ } while (!_should_stop && !state->is_cancelled() && block->rows() == 0
&& !(*eof) &&
_num_rows_read < rows_read_threshold);
}
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index c29aafafc04..3dd57bedb51 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -64,6 +64,9 @@ public:
virtual Status close(RuntimeState* state);
+ // Try to stop scanner, and all running readers.
+ virtual void try_stop() { _should_stop = true; };
+
virtual std::string get_name() { return ""; }
// return the readable name of current scan range.
@@ -212,8 +215,11 @@ protected:
ScannerCounter _counter;
int64_t _per_scanner_timer = 0;
+
+ bool _should_stop = false;
};
using VScannerSPtr = std::shared_ptr<VScanner>;
+using VScannerWPtr = std::weak_ptr<VScanner>;
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]