This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 6600e92b12c [scan](status) Finish execution if scanner failed (#32966)
6600e92b12c is described below
commit 6600e92b12ceace33dd326f98119fcfe979dfc4d
Author: Gabriel <[email protected]>
AuthorDate: Fri Mar 29 10:50:39 2024 +0800
[scan](status) Finish execution if scanner failed (#32966)
---
be/src/vec/core/block.h | 5 +++--
be/src/vec/exec/scan/scanner_scheduler.cpp | 6 +++++-
be/src/vec/exec/scan/vscanner.cpp | 15 +++++++++------
3 files changed, 17 insertions(+), 9 deletions(-)
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index a9769e7b679..ce32cc5cf39 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -532,8 +532,9 @@ public:
} else {
if (_columns.size() != block.columns()) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
- "Merge block not match, self:[{}], input:[{}], ",
dump_types(),
- block.dump_types());
+ "Merge block not match, self:[columns: {}, types: {}],
input:[columns: {}, "
+ "types: {}], ",
+ dump_names(), dump_types(), block.dump_names(),
block.dump_types());
}
for (int i = 0; i < _columns.size(); ++i) {
if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index d81e327b886..571df35e55e 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -275,7 +275,11 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scan_task->cached_blocks.back()->rows() + free_block->rows() <=
ctx->batch_size()) {
size_t block_size =
scan_task->cached_blocks.back()->allocated_bytes();
vectorized::MutableBlock
mutable_block(scan_task->cached_blocks.back().get());
- static_cast<void>(mutable_block.merge(*free_block));
+ status = mutable_block.merge(*free_block);
+ if (!status.ok()) {
+ LOG(WARNING) << "Block merge failed: " << status.to_string();
+ break;
+ }
scan_task->cached_blocks.back().get()->set_columns(
std::move(mutable_block.mutable_columns()));
ctx->return_free_block(std::move(free_block));
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 39a9059d1d3..7354b9e085f 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -22,6 +22,7 @@
#include "common/config.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
+#include "util/defer_op.h"
#include "util/runtime_profile.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/exec/scan/vscan_node.h"
@@ -152,6 +153,14 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
}
Status VScanner::_filter_output_block(Block* block) {
+ Defer clear_tmp_block([&]() {
+ auto all_column_names = block->get_names();
+ for (auto& name : all_column_names) {
+ if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
+ block->erase(name);
+ }
+ }
+ });
if (block->has(BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED)) {
// scanner filter_block is already done (only by _topn_next
currently), just skip it
return Status::OK();
@@ -159,12 +168,6 @@ Status VScanner::_filter_output_block(Block* block) {
auto old_rows = block->rows();
Status st = VExprContext::filter_block(_conjuncts, block,
block->columns());
_counter.num_rows_unselected += old_rows - block->rows();
- auto all_column_names = block->get_names();
- for (auto& name : all_column_names) {
- if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
- block->erase(name);
- }
- }
return st;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]