This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f23e8bf323f [fix](scanner) Check query status when iterating through
rowsets and segments (#41363)
f23e8bf323f is described below
commit f23e8bf323fcc8c6ef15fbcfe01067eef9122caf
Author: zhiqiang <[email protected]>
AuthorDate: Sat Sep 28 10:12:25 2024 +0800
[fix](scanner) Check query status when iterating through rowsets and
segments (#41363)
To avoid scanner can not exit when doing large IO.
---
be/src/olap/rowset/beta_rowset_reader.cpp | 19 +++++++++++++++++++
be/src/vec/olap/block_reader.cpp | 6 ++++++
be/src/vec/olap/vcollect_iterator.cpp | 18 ++++++++++++++++++
3 files changed, 43 insertions(+)
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 5fdb2d7c41a..d2c7023f659 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -351,6 +351,11 @@ Status BetaRowsetReader::next_block(vectorized::Block*
block) {
return Status::Error<END_OF_FILE>("BetaRowsetReader is empty");
}
+ RuntimeState* runtime_state = nullptr;
+ if (_read_context != nullptr) {
+ runtime_state = _read_context->runtime_state;
+ }
+
do {
auto s = _iterator->next_batch(block);
if (!s.ok()) {
@@ -359,6 +364,10 @@ Status BetaRowsetReader::next_block(vectorized::Block*
block) {
}
return s;
}
+
+ if (runtime_state != nullptr && runtime_state->is_cancelled())
[[unlikely]] {
+ return runtime_state->cancel_reason();
+ }
} while (block->empty());
return Status::OK();
@@ -367,6 +376,12 @@ Status BetaRowsetReader::next_block(vectorized::Block*
block) {
Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
RETURN_IF_ERROR(_init_iterator_once());
+
+ RuntimeState* runtime_state = nullptr;
+ if (_read_context != nullptr) {
+ runtime_state = _read_context->runtime_state;
+ }
+
do {
auto s = _iterator->next_block_view(block_view);
if (!s.ok()) {
@@ -375,6 +390,10 @@ Status
BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
}
return s;
}
+
+ if (runtime_state != nullptr && runtime_state->is_cancelled())
[[unlikely]] {
+ return runtime_state->cancel_reason();
+ }
} while (block_view->empty());
return Status::OK();
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index e2b4ba39e12..9d79b51975c 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -39,6 +39,7 @@
#include "olap/rowset/rowset_reader_context.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
+#include "runtime/runtime_state.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
@@ -135,8 +136,13 @@ Status BlockReader::_init_collect_iter(const ReaderParams&
read_params) {
read_params.read_orderby_key_reverse);
std::vector<RowsetReaderSharedPtr> valid_rs_readers;
+ RuntimeState* runtime_state = read_params.runtime_state;
for (int i = 0; i < read_params.rs_splits.size(); ++i) {
+ if (runtime_state != nullptr && runtime_state->is_cancelled()) {
+ return runtime_state->cancel_reason();
+ }
+
auto& rs_split = read_params.rs_splits[i];
// _vcollect_iter.topn_next() will init rs_reader by itself
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index 3eb768ff803..f7017a058df 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -490,6 +490,11 @@ int64_t VCollectIterator::Level0Iterator::version() const {
}
Status VCollectIterator::Level0Iterator::refresh_current_row() {
+ RuntimeState* runtime_state = nullptr;
+ if (_reader != nullptr) {
+ runtime_state = _reader->_reader_context.runtime_state;
+ }
+
do {
if (_block == nullptr && !_get_data_by_ref) {
_block = std::make_shared<Block>(_schema.create_block(
@@ -501,6 +506,10 @@ Status
VCollectIterator::Level0Iterator::refresh_current_row() {
} else {
_reset();
auto res = _refresh();
+
+ if (runtime_state != nullptr && runtime_state->is_cancelled())
[[unlikely]] {
+ return runtime_state->cancel_reason();
+ }
if (!res.ok() && !res.is<END_OF_FILE>()) {
return res;
}
@@ -677,8 +686,17 @@ Status VCollectIterator::Level1Iterator::init(bool
get_data_by_ref) {
}
Status VCollectIterator::Level1Iterator::ensure_first_row_ref() {
+ RuntimeState* runtime_state = nullptr;
+ if (_reader != nullptr) {
+ runtime_state = _reader->_reader_context.runtime_state;
+ }
+
for (auto iter = _children.begin(); iter != _children.end();) {
auto s = (*iter)->ensure_first_row_ref();
+ if (runtime_state != nullptr && runtime_state->is_cancelled()) {
+ return runtime_state->cancel_reason();
+ }
+
if (!s.ok()) {
iter = _children.erase(iter);
if (!s.is<END_OF_FILE>()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]