This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 6600e92b12c [scan](status) Finish execution if scanner failed (#32966)
6600e92b12c is described below

commit 6600e92b12ceace33dd326f98119fcfe979dfc4d
Author: Gabriel <[email protected]>
AuthorDate: Fri Mar 29 10:50:39 2024 +0800

    [scan](status) Finish execution if scanner failed (#32966)
---
 be/src/vec/core/block.h                    |  5 +++--
 be/src/vec/exec/scan/scanner_scheduler.cpp |  6 +++++-
 be/src/vec/exec/scan/vscanner.cpp          | 15 +++++++++------
 3 files changed, 17 insertions(+), 9 deletions(-)

diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index a9769e7b679..ce32cc5cf39 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -532,8 +532,9 @@ public:
         } else {
             if (_columns.size() != block.columns()) {
                 return Status::Error<ErrorCode::INTERNAL_ERROR>(
-                        "Merge block not match, self:[{}], input:[{}], ", 
dump_types(),
-                        block.dump_types());
+                        "Merge block not match, self:[columns: {}, types: {}], 
input:[columns: {}, "
+                        "types: {}], ",
+                        dump_names(), dump_types(), block.dump_names(), 
block.dump_types());
             }
             for (int i = 0; i < _columns.size(); ++i) {
                 if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index d81e327b886..571df35e55e 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -275,7 +275,11 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
             scan_task->cached_blocks.back()->rows() + free_block->rows() <= 
ctx->batch_size()) {
             size_t block_size = 
scan_task->cached_blocks.back()->allocated_bytes();
             vectorized::MutableBlock 
mutable_block(scan_task->cached_blocks.back().get());
-            static_cast<void>(mutable_block.merge(*free_block));
+            status = mutable_block.merge(*free_block);
+            if (!status.ok()) {
+                LOG(WARNING) << "Block merge failed: " << status.to_string();
+                break;
+            }
             scan_task->cached_blocks.back().get()->set_columns(
                     std::move(mutable_block.mutable_columns()));
             ctx->return_free_block(std::move(free_block));
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 39a9059d1d3..7354b9e085f 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -22,6 +22,7 @@
 #include "common/config.h"
 #include "pipeline/exec/scan_operator.h"
 #include "runtime/descriptors.h"
+#include "util/defer_op.h"
 #include "util/runtime_profile.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/exec/scan/vscan_node.h"
@@ -152,6 +153,14 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
 }
 
 Status VScanner::_filter_output_block(Block* block) {
+    Defer clear_tmp_block([&]() {
+        auto all_column_names = block->get_names();
+        for (auto& name : all_column_names) {
+            if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
+                block->erase(name);
+            }
+        }
+    });
     if (block->has(BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED)) {
         // scanner filter_block is already done (only by _topn_next 
currently), just skip it
         return Status::OK();
@@ -159,12 +168,6 @@ Status VScanner::_filter_output_block(Block* block) {
     auto old_rows = block->rows();
     Status st = VExprContext::filter_block(_conjuncts, block, 
block->columns());
     _counter.num_rows_unselected += old_rows - block->rows();
-    auto all_column_names = block->get_names();
-    for (auto& name : all_column_names) {
-        if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
-            block->erase(name);
-        }
-    }
     return st;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to