This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 97e63516b74 [fix](streamload) catch exception when reading arrow data
(#28558)
97e63516b74 is described below
commit 97e63516b741a6edc3f363d08a2bce4ed0841e8e
Author: wuwenchi <[email protected]>
AuthorDate: Mon Dec 18 22:03:57 2023 +0800
[fix](streamload) catch exception when reading arrow data (#28558)
---
.../vec/exec/format/arrow/arrow_stream_reader.cpp | 34 +++++++++++++---------
1 file changed, 20 insertions(+), 14 deletions(-)
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
index 29ee451e3ac..33ac46a8279 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
@@ -65,22 +65,24 @@ Status ArrowStreamReader::get_next_block(Block* block,
size_t* read_rows, bool*
}
// create a reader to read data
- arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchStreamReader>> tRet =
+ arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchStreamReader>>
res_open =
arrow::ipc::RecordBatchStreamReader::Open(_pip_stream.get(),
arrow::ipc::IpcReadOptions::Defaults());
- if (!tRet.ok()) {
- LOG(WARNING) << "failed to open stream reader: " <<
tRet.status().message();
- return Status::InternalError("failed to open stream reader: {}",
tRet.status().message());
+ if (!res_open.ok()) {
+ LOG(WARNING) << "failed to open stream reader: " <<
res_open.status().message();
+ return Status::InternalError("failed to open stream reader: {}",
+ res_open.status().message());
}
- auto reader = std::move(tRet).ValueUnsafe();
+ auto reader = std::move(res_open).ValueUnsafe();
// get arrow data from reader
- arrow::Result<arrow::RecordBatchVector> tRet2 = reader->ToRecordBatches();
- if (!tRet2.ok()) {
- LOG(WARNING) << "failed to read batch: " << tRet2.status().message();
- return Status::InternalError("failed to read batch: {}",
tRet2.status().message());
+ arrow::Result<arrow::RecordBatchVector> res_reader =
reader->ToRecordBatches();
+ if (!res_reader.ok()) {
+ LOG(WARNING) << "failed to read batch: " <<
res_reader.status().message();
+ return Status::InternalError("failed to read batch: {}",
res_reader.status().message());
}
- std::vector<std::shared_ptr<arrow::RecordBatch>> out_batches =
std::move(tRet2).ValueUnsafe();
+ std::vector<std::shared_ptr<arrow::RecordBatch>> out_batches =
+ std::move(res_reader).ValueUnsafe();
// convert arrow batch to block
auto columns = block->mutate_columns();
@@ -94,10 +96,14 @@ Status ArrowStreamReader::get_next_block(Block* block,
size_t* read_rows, bool*
std::string column_name = batch.schema()->field(c)->name();
- vectorized::ColumnWithTypeAndName& column_with_name =
block->get_by_name(column_name);
-
- column_with_name.type->get_serde()->read_column_from_arrow(
- column_with_name.column->assume_mutable_ref(), column, 0,
num_rows, _ctzz);
+ try {
+ vectorized::ColumnWithTypeAndName& column_with_name =
+ block->get_by_name(column_name);
+ column_with_name.type->get_serde()->read_column_from_arrow(
+ column_with_name.column->assume_mutable_ref(), column,
0, num_rows, _ctzz);
+ } catch (Exception& e) {
+ return Status::InternalError("Failed to convert from arrow to
block: {}", e.what());
+ }
}
*read_rows += batch.num_rows();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]