This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d3a247eb1aa [fix](arrow-flight) Support SerDe of Doris string larger 
than 2GB to Arrow large string   (#51265)
d3a247eb1aa is described below

commit d3a247eb1aab710e55e619866024edf73e85811a
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu May 29 10:40:26 2025 +0800

    [fix](arrow-flight) Support SerDe of Doris string larger than 2GB to Arrow 
large string   (#51265)
    
    ### What problem does this PR solve?
    
    In Arrow type, `utf8` is smaller than 2GB, and `large_utf8` is used for
    larger than 2GB.
    
    Fix:
    ```
    arrow serde with arrow: utf8 with column : String with error msg: Capacity 
error: array cannot contain more than 2147483646 bytes
    ```
---
 .../arrow_flight/arrow_flight_batch_reader.cpp     | 12 +++-
 .../arrow_flight/arrow_flight_batch_reader.h       |  3 +
 be/src/util/arrow/block_convertor.cpp              | 14 +++--
 be/src/util/arrow/row_batch.h                      |  2 +
 be/src/util/arrow/utils.h                          | 10 ++++
 .../vec/data_types/serde/data_type_array_serde.cpp | 25 ++++----
 .../vec/data_types/serde/data_type_array_serde.h   | 10 ++--
 .../data_types/serde/data_type_bitmap_serde.cpp    | 15 ++---
 .../vec/data_types/serde/data_type_bitmap_serde.h  | 16 +++---
 .../data_types/serde/data_type_date64_serde.cpp    | 48 ++++++++--------
 .../vec/data_types/serde/data_type_date64_serde.h  | 18 +++---
 .../serde/data_type_datetimev2_serde.cpp           | 32 ++++++-----
 .../data_types/serde/data_type_datetimev2_serde.h  | 10 ++--
 .../data_types/serde/data_type_datev2_serde.cpp    | 23 ++++----
 .../vec/data_types/serde/data_type_datev2_serde.h  | 10 ++--
 .../data_types/serde/data_type_decimal_serde.cpp   | 66 +++++++++++-----------
 .../vec/data_types/serde/data_type_decimal_serde.h | 10 ++--
 .../vec/data_types/serde/data_type_hll_serde.cpp   | 15 ++---
 be/src/vec/data_types/serde/data_type_hll_serde.h  | 14 ++---
 .../vec/data_types/serde/data_type_ipv4_serde.cpp  | 23 ++++----
 be/src/vec/data_types/serde/data_type_ipv4_serde.h | 10 ++--
 .../vec/data_types/serde/data_type_ipv6_serde.cpp  | 31 +++++-----
 be/src/vec/data_types/serde/data_type_ipv6_serde.h | 10 ++--
 .../vec/data_types/serde/data_type_jsonb_serde.cpp | 18 +++---
 .../vec/data_types/serde/data_type_jsonb_serde.h   |  6 +-
 .../vec/data_types/serde/data_type_map_serde.cpp   | 57 +++++++++++--------
 be/src/vec/data_types/serde/data_type_map_serde.h  | 10 ++--
 .../vec/data_types/serde/data_type_nothing_serde.h | 18 +++---
 .../data_types/serde/data_type_nullable_serde.cpp  | 20 ++++---
 .../data_types/serde/data_type_nullable_serde.h    | 10 ++--
 .../data_types/serde/data_type_number_serde.cpp    | 47 +++++++--------
 .../vec/data_types/serde/data_type_number_serde.h  | 10 ++--
 .../data_types/serde/data_type_object_serde.cpp    | 24 ++++----
 .../vec/data_types/serde/data_type_object_serde.h  | 14 ++---
 .../serde/data_type_quantilestate_serde.h          | 26 +++++----
 be/src/vec/data_types/serde/data_type_serde.h      | 22 ++++----
 .../vec/data_types/serde/data_type_string_serde.h  | 54 ++++++++++++++----
 .../data_types/serde/data_type_struct_serde.cpp    | 30 +++++-----
 .../vec/data_types/serde/data_type_struct_serde.h  | 10 ++--
 .../vec/exec/format/arrow/arrow_stream_reader.cpp  |  5 +-
 be/src/vec/utils/arrow_column_to_doris_column.cpp  |  7 ++-
 .../vec/data_types/common_data_type_serder_test.h  | 11 ++--
 .../serde/data_type_serde_arrow_test.cpp           | 20 +++++++
 43 files changed, 490 insertions(+), 356 deletions(-)

diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp 
b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
index 969e743040c..07e46cfcfed 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
@@ -90,7 +90,7 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> 
ArrowFlightBatchLoca
     return result;
 }
 
-arrow::Status 
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) 
{
+arrow::Status 
ArrowFlightBatchLocalReader::ReadNextImpl(std::shared_ptr<arrow::RecordBatch>* 
out) {
     // parameter *out not nullptr
     *out = nullptr;
     SCOPED_ATTACH_TASK(_mem_tracker);
@@ -124,6 +124,10 @@ arrow::Status 
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::Recor
     return arrow::Status::OK();
 }
 
+arrow::Status 
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) 
{
+    RETURN_ARROW_STATUS_IF_CATCH_EXCEPTION(ReadNextImpl(out));
+}
+
 ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader(
         const std::shared_ptr<QueryStatement>& statement,
         const std::shared_ptr<PBackendService_Stub>& stub)
@@ -284,7 +288,7 @@ arrow::Status ArrowFlightBatchRemoteReader::init_schema() {
     return arrow::Status::OK();
 }
 
-arrow::Status 
ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* 
out) {
+arrow::Status 
ArrowFlightBatchRemoteReader::ReadNextImpl(std::shared_ptr<arrow::RecordBatch>* 
out) {
     // parameter *out not nullptr
     *out = nullptr;
     SCOPED_ATTACH_TASK(_mem_tracker);
@@ -310,4 +314,8 @@ arrow::Status 
ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::Reco
     return arrow::Status::OK();
 }
 
+arrow::Status 
ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* 
out) {
+    RETURN_ARROW_STATUS_IF_CATCH_EXCEPTION(ReadNextImpl(out));
+}
+
 } // namespace doris::flight
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.h 
b/be/src/service/arrow_flight/arrow_flight_batch_reader.h
index 612ebc8063c..a1b25511b8c 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.h
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.h
@@ -77,6 +77,8 @@ private:
     ArrowFlightBatchLocalReader(const std::shared_ptr<QueryStatement>& 
statement,
                                 const std::shared_ptr<arrow::Schema>& schema,
                                 const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker);
+
+    arrow::Status ReadNextImpl(std::shared_ptr<arrow::RecordBatch>* out);
 };
 
 class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase {
@@ -94,6 +96,7 @@ private:
     ArrowFlightBatchRemoteReader(const std::shared_ptr<QueryStatement>& 
statement,
                                  const std::shared_ptr<PBackendService_Stub>& 
stub);
 
+    arrow::Status ReadNextImpl(std::shared_ptr<arrow::RecordBatch>* out);
     arrow::Status _fetch_schema();
     arrow::Status _fetch_data();
 
diff --git a/be/src/util/arrow/block_convertor.cpp 
b/be/src/util/arrow/block_convertor.cpp
index 17dd294b9e2..b13392360b2 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -97,17 +97,21 @@ Status 
FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
         _cur_rows = _block.rows();
         _cur_col = _block.get_by_position(idx).column;
         _cur_type = _block.get_by_position(idx).type;
+        auto column = _cur_col->convert_to_full_column_if_const();
+        auto arrow_type = _schema->field(idx)->type();
+        if (arrow_type->name() == "utf8" && column->byte_size() >= 
MAX_ARROW_UTF8) {
+            arrow_type = arrow::large_utf8();
+        }
         std::unique_ptr<arrow::ArrayBuilder> builder;
-        auto arrow_st = arrow::MakeBuilder(_pool, _schema->field(idx)->type(), 
&builder);
+        auto arrow_st = arrow::MakeBuilder(_pool, arrow_type, &builder);
         if (!arrow_st.ok()) {
             return to_doris_status(arrow_st);
         }
         _cur_builder = builder.get();
-        auto column = _cur_col->convert_to_full_column_if_const();
         try {
-            _cur_type->get_serde()->write_column_to_arrow(*column, nullptr, 
_cur_builder,
-                                                          _cur_start, 
_cur_start + _cur_rows,
-                                                          _timezone_obj);
+            RETURN_IF_ERROR(_cur_type->get_serde()->write_column_to_arrow(
+                    *column, nullptr, _cur_builder, _cur_start, _cur_start + 
_cur_rows,
+                    _timezone_obj));
         } catch (std::exception& e) {
             return Status::InternalError(
                     "Fail to convert block data to arrow data, type: {}, name: 
{}, error: {}",
diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h
index db7b70232d2..6dafbe8cab6 100644
--- a/be/src/util/arrow/row_batch.h
+++ b/be/src/util/arrow/row_batch.h
@@ -39,6 +39,8 @@ class Schema;
 
 namespace doris {
 
+constexpr size_t MAX_ARROW_UTF8 = (1ULL << 21); // 2G
+
 class RowDescriptor;
 
 Status convert_to_arrow_type(const vectorized::DataTypePtr& type,
diff --git a/be/src/util/arrow/utils.h b/be/src/util/arrow/utils.h
index c992dabee57..0a731bafbd5 100644
--- a/be/src/util/arrow/utils.h
+++ b/be/src/util/arrow/utils.h
@@ -55,6 +55,16 @@ namespace doris {
         }                                     \
     } while (false)
 
+#define RETURN_ARROW_STATUS_IF_CATCH_EXCEPTION(stmt)                           
    \
+    do {                                                                       
    \
+        try {                                                                  
    \
+            arrow::Status _status_ = (stmt);                                   
    \
+            return _status_;                                                   
    \
+        } catch (const doris::Exception& e) {                                  
    \
+            return to_arrow_status(Status::Error<false>(e.code(), 
e.to_string())); \
+        }                                                                      
    \
+    } while (0)
+
 // Pretty print a arrow RecordBatch.
 Status arrow_pretty_print(const arrow::RecordBatch& rb, std::ostream* os);
 Status arrow_pretty_print(const arrow::Array& rb, std::ostream* os);
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp 
b/be/src/vec/data_types/serde/data_type_array_serde.cpp
index ea748cdd6c4..dc90a67a74c 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp
@@ -271,9 +271,9 @@ void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& 
column, const JsonbVa
     column.deserialize_and_insert_from_arena(blob->getBlob());
 }
 
-void DataTypeArraySerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                               arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                               int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeArraySerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                                 arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                                 int64_t end, const 
cctz::time_zone& ctz) const {
     const auto& array_column = static_cast<const ColumnArray&>(column);
     const auto& offsets = array_column.get_offsets();
     const auto& nested_data = array_column.get_data();
@@ -281,19 +281,22 @@ void DataTypeArraySerDe::write_column_to_arrow(const 
IColumn& column, const Null
     auto* nested_builder = builder.value_builder();
     for (size_t array_idx = start; array_idx < end; ++array_idx) {
         if (null_map && (*null_map)[array_idx]) {
-            checkArrowStatus(builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
             continue;
         }
-        checkArrowStatus(builder.Append(), column.get_name(), 
array_builder->type()->name());
-        nested_serde->write_column_to_arrow(nested_data, nullptr, 
nested_builder,
-                                            offsets[array_idx - 1], 
offsets[array_idx], ctz);
+        RETURN_IF_ERROR(checkArrowStatus(builder.Append(), column.get_name(),
+                                         array_builder->type()->name()));
+        RETURN_IF_ERROR(nested_serde->write_column_to_arrow(nested_data, 
nullptr, nested_builder,
+                                                            offsets[array_idx 
- 1],
+                                                            
offsets[array_idx], ctz));
     }
+    return Status::OK();
 }
 
-void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                                int64_t start, int64_t end,
-                                                const cctz::time_zone& ctz) 
const {
+Status DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                  int64_t start, int64_t end,
+                                                  const cctz::time_zone& ctz) 
const {
     auto& column_array = static_cast<ColumnArray&>(column);
     auto& offsets_data = column_array.get_offsets();
     const auto* concrete_array = dynamic_cast<const 
arrow::ListArray*>(arrow_array);
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h 
b/be/src/vec/data_types/serde/data_type_array_serde.h
index f9d852b3843..8378c79cae2 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_array_serde.h
@@ -78,11 +78,11 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp 
b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
index 2e4f8d72c6f..a506573531a 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
@@ -121,24 +121,25 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const 
IColumn& column, JsonbWr
     result.writeEndBinary();
 }
 
-void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                                arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                                int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                                  arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                                  int64_t end, const 
cctz::time_zone& ctz) const {
     const auto& col = assert_cast<const ColumnBitmap&>(column);
     auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
     for (size_t string_i = start; string_i < end; ++string_i) {
         if (null_map && (*null_map)[string_i]) {
-            checkArrowStatus(builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
         } else {
             auto& bitmap_value = 
const_cast<BitmapValue&>(col.get_element(string_i));
             std::string memory_buffer(bitmap_value.getSizeInBytes(), '0');
             bitmap_value.write_to(memory_buffer.data());
-            checkArrowStatus(
+            RETURN_IF_ERROR(checkArrowStatus(
                     builder.Append(memory_buffer.data(), 
static_cast<int>(memory_buffer.size())),
-                    column.get_name(), array_builder->type()->name());
+                    column.get_name(), array_builder->type()->name()));
         }
     }
+    return Status::OK();
 }
 
 void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const 
JsonbValue* arg) const {
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h 
b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
index 22c450cd27e..ab99157319a 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
@@ -57,14 +57,14 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "read_column_from_arrow with type " + 
column.get_name());
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override {
+        return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+                             "read_column_from_arrow with type " + 
column.get_name());
     }
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp 
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
index 9e3e12b0313..03a10b554c7 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -169,17 +169,18 @@ Status 
DataTypeDateTimeSerDe::deserialize_one_cell_from_json(IColumn& column, Sl
     return Status::OK();
 }
 
-void DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                                   int64_t start, int64_t end,
-                                                   const cctz::time_zone& ctz) 
const {
-    _read_column_from_arrow<false>(column, arrow_array, start, end, ctz);
+Status DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column,
+                                                     const arrow::Array* 
arrow_array, int64_t start,
+                                                     int64_t end,
+                                                     const cctz::time_zone& 
ctz) const {
+    return _read_column_from_arrow<false>(column, arrow_array, start, end, 
ctz);
 }
 
 template <PrimitiveType T>
-void DataTypeDate64SerDe<T>::write_column_to_arrow(const IColumn& column, 
const NullMap* null_map,
-                                                   arrow::ArrayBuilder* 
array_builder,
-                                                   int64_t start, int64_t end,
-                                                   const cctz::time_zone& ctz) 
const {
+Status DataTypeDate64SerDe<T>::write_column_to_arrow(const IColumn& column, 
const NullMap* null_map,
+                                                     arrow::ArrayBuilder* 
array_builder,
+                                                     int64_t start, int64_t 
end,
+                                                     const cctz::time_zone& 
ctz) const {
     auto& col_data = static_cast<const 
ColumnVector<Int64>&>(column).get_data();
     auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
     for (size_t i = start; i < end; ++i) {
@@ -187,13 +188,14 @@ void DataTypeDate64SerDe<T>::write_column_to_arrow(const 
IColumn& column, const
         const VecDateTimeValue* time_val = (const 
VecDateTimeValue*)(&col_data[i]);
         size_t len = time_val->to_buffer(buf);
         if (null_map && (*null_map)[i]) {
-            checkArrowStatus(string_builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(string_builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
         } else {
-            checkArrowStatus(string_builder.Append(buf, 
cast_set<int32_t>(len)), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(string_builder.Append(buf, 
cast_set<int32_t>(len)),
+                                             column.get_name(), 
array_builder->type()->name()));
         }
     }
+    return Status::OK();
 }
 
 static int64_t time_unit_divisor(arrow::TimeUnit::type unit) {
@@ -218,10 +220,10 @@ static int64_t time_unit_divisor(arrow::TimeUnit::type 
unit) {
 
 template <PrimitiveType T>
 template <bool is_date>
-void DataTypeDate64SerDe<T>::_read_column_from_arrow(IColumn& column,
-                                                     const arrow::Array* 
arrow_array, int64_t start,
-                                                     int64_t end,
-                                                     const cctz::time_zone& 
ctz) const {
+Status DataTypeDate64SerDe<T>::_read_column_from_arrow(IColumn& column,
+                                                       const arrow::Array* 
arrow_array,
+                                                       int64_t start, int64_t 
end,
+                                                       const cctz::time_zone& 
ctz) const {
     auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
     int64_t divisor = 1;
     int64_t multiplier = 1;
@@ -271,16 +273,18 @@ void 
DataTypeDate64SerDe<T>::_read_column_from_arrow(IColumn& column,
             col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
         }
     } else {
-        throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
-                               "Unsupported Arrow Type: " + 
arrow_array->type()->name());
+        return Status::Error(doris::ErrorCode::INVALID_ARGUMENT,
+                             "Unsupported Arrow Type: " + 
arrow_array->type()->name());
     }
+    return Status::OK();
 }
 
 template <PrimitiveType T>
-void DataTypeDate64SerDe<T>::read_column_from_arrow(IColumn& column,
-                                                    const arrow::Array* 
arrow_array, int64_t start,
-                                                    int64_t end, const 
cctz::time_zone& ctz) const {
-    _read_column_from_arrow<true>(column, arrow_array, start, end, ctz);
+Status DataTypeDate64SerDe<T>::read_column_from_arrow(IColumn& column,
+                                                      const arrow::Array* 
arrow_array,
+                                                      int64_t start, int64_t 
end,
+                                                      const cctz::time_zone& 
ctz) const {
+    return _read_column_from_arrow<true>(column, arrow_array, start, end, ctz);
 }
 
 template <PrimitiveType T>
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h 
b/be/src/vec/data_types/serde/data_type_date64_serde.h
index caea5a3b551..944ef7bc977 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.h
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -60,11 +60,11 @@ public:
             IColumn& column, std::vector<Slice>& slices, uint64_t* 
num_deserialized,
             const typename DataTypeNumberSerDe<T>::FormatOptions& options) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
     Status write_column_to_mysql(
             const IColumn& column, MysqlRowBuffer<true>& row_buffer, int64_t 
row_idx,
             bool col_const,
@@ -81,8 +81,8 @@ public:
 
 protected:
     template <bool is_date>
-    void _read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                 int64_t end, const cctz::time_zone& ctz) 
const;
+    Status _read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                   int64_t end, const cctz::time_zone& ctz) 
const;
 
 private:
     template <bool is_binary_format>
@@ -107,8 +107,8 @@ public:
     Status deserialize_column_from_json_vector(IColumn& column, 
std::vector<Slice>& slices,
                                                uint64_t* num_deserialized,
                                                const FormatOptions& options) 
const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index 7dcd2bba6ff..3cfa17244ad 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -108,16 +108,17 @@ Status 
DataTypeDateTimeV2SerDe::deserialize_one_cell_from_json(IColumn& column,
     return Status::OK();
 }
 
-void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column, 
const NullMap* null_map,
-                                                    arrow::ArrayBuilder* 
array_builder,
-                                                    int64_t start, int64_t end,
-                                                    const cctz::time_zone& 
ctz) const {
+Status DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column,
+                                                      const NullMap* null_map,
+                                                      arrow::ArrayBuilder* 
array_builder,
+                                                      int64_t start, int64_t 
end,
+                                                      const cctz::time_zone& 
ctz) const {
     const auto& col_data = static_cast<const 
ColumnVector<UInt64>&>(column).get_data();
     auto& timestamp_builder = 
assert_cast<arrow::TimestampBuilder&>(*array_builder);
     for (size_t i = start; i < end; ++i) {
         if (null_map && (*null_map)[i]) {
-            checkArrowStatus(timestamp_builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(timestamp_builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
         } else {
             int64_t timestamp = 0;
             DateV2Value<DateTimeV2ValueType> datetime_val =
@@ -131,16 +132,17 @@ void DataTypeDateTimeV2SerDe::write_column_to_arrow(const 
IColumn& column, const
                 uint32_t millisecond = datetime_val.microsecond() / 1000;
                 timestamp = (timestamp * 1000) + millisecond;
             }
-            checkArrowStatus(timestamp_builder.Append(timestamp), 
column.get_name(),
-                             array_builder->type()->name());
+            
RETURN_IF_ERROR(checkArrowStatus(timestamp_builder.Append(timestamp), 
column.get_name(),
+                                             array_builder->type()->name()));
         }
     }
+    return Status::OK();
 }
 
-void DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
-                                                     const arrow::Array* 
arrow_array, int64_t start,
-                                                     int64_t end,
-                                                     const cctz::time_zone& 
ctz) const {
+Status DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
+                                                       const arrow::Array* 
arrow_array,
+                                                       int64_t start, int64_t 
end,
+                                                       const cctz::time_zone& 
ctz) const {
     auto& col_data = static_cast<ColumnDateTimeV2&>(column).get_data();
     int64_t divisor = 1;
     if (arrow_array->type()->id() == arrow::Type::TIMESTAMP) {
@@ -165,7 +167,8 @@ void 
DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
         }
         default: {
             LOG(WARNING) << "not support convert to datetimev2 from 
time_unit:" << type->unit();
-            return;
+            return Status::InvalidArgument("not support convert to datetimev2 
from time_unit: {}",
+                                           type->unit());
         }
         }
         for (auto value_i = start; value_i < end; ++value_i) {
@@ -184,7 +187,10 @@ void 
DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
     } else {
         LOG(WARNING) << "not support convert to datetimev2 from arrow type:"
                      << arrow_array->type()->id();
+        return Status::InternalError("not support convert to datetimev2 from 
arrow type: {}",
+                                     arrow_array->type()->id());
     }
+    return Status::OK();
 }
 
 template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h 
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
index 48154d0cae5..899e6e1b1e7 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
@@ -59,11 +59,11 @@ public:
                                                uint64_t* num_deserialized,
                                                const FormatOptions& options) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp 
b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
index f1bcbdc266c..1c5109c9d27 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -91,27 +91,29 @@ Status 
DataTypeDateV2SerDe::deserialize_one_cell_from_json(IColumn& column, Slic
     return Status::OK();
 }
 
-void DataTypeDateV2SerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                                arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                                int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeDateV2SerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                                  arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                                  int64_t end, const 
cctz::time_zone& ctz) const {
     const auto& col_data = static_cast<const 
ColumnVector<UInt32>&>(column).get_data();
     auto& date32_builder = assert_cast<arrow::Date32Builder&>(*array_builder);
     for (size_t i = start; i < end; ++i) {
         auto daynr = binary_cast<UInt32, 
DateV2Value<DateV2ValueType>>(col_data[i]).daynr() -
                      date_threshold;
         if (null_map && (*null_map)[i]) {
-            checkArrowStatus(date32_builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(date32_builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
         } else {
-            checkArrowStatus(date32_builder.Append(cast_set<int, int64_t, 
false>(daynr)),
-                             column.get_name(), array_builder->type()->name());
+            RETURN_IF_ERROR(
+                    checkArrowStatus(date32_builder.Append(cast_set<int, 
int64_t, false>(daynr)),
+                                     column.get_name(), 
array_builder->type()->name()));
         }
     }
+    return Status::OK();
 }
 
-void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                                 int64_t start, int64_t end,
-                                                 const cctz::time_zone& ctz) 
const {
+Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                   int64_t start, int64_t end,
+                                                   const cctz::time_zone& ctz) 
const {
     auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
     const auto* concrete_array = dynamic_cast<const 
arrow::Date32Array*>(arrow_array);
     for (auto value_i = start; value_i < end; ++value_i) {
@@ -119,6 +121,7 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& 
column, const arrow::A
         v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold);
         col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, 
UInt32>(v));
     }
+    return Status::OK();
 }
 
 template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.h 
b/be/src/vec/data_types/serde/data_type_datev2_serde.h
index 9d2d66384be..58f441e05f4 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.h
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.h
@@ -58,11 +58,11 @@ public:
                                                uint64_t* num_deserialized,
                                                const FormatOptions& options) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
                                  const FormatOptions& options) const override;
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp 
b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index 891491b09f1..0569438cdf6 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -87,10 +87,11 @@ Status 
DataTypeDecimalSerDe<T>::deserialize_one_cell_from_json(IColumn& column,
 }
 
 template <typename T>
-void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, 
const NullMap* null_map,
-                                                    arrow::ArrayBuilder* 
array_builder,
-                                                    int64_t start, int64_t end,
-                                                    const cctz::time_zone& 
ctz) const {
+Status DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column,
+                                                      const NullMap* null_map,
+                                                      arrow::ArrayBuilder* 
array_builder,
+                                                      int64_t start, int64_t 
end,
+                                                      const cctz::time_zone& 
ctz) const {
     auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column);
     if constexpr (std::is_same_v<T, Decimal<Int128>>) {
         auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
@@ -98,8 +99,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                 std::make_shared<arrow::Decimal128Type>(27, 9);
         for (size_t i = start; i < end; ++i) {
             if (null_map && (*null_map)[i]) {
-                checkArrowStatus(builder.AppendNull(), column.get_name(),
-                                 array_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                                 
array_builder->type()->name()));
                 continue;
             }
             const auto& data_ref = col.get_data_at(i);
@@ -107,8 +108,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
             int64_t high = (p_value->value) >> 64;
             uint64 low = cast_set<uint64>((p_value->value) & 
0xFFFFFFFFFFFFFFFF);
             arrow::Decimal128 value(high, low);
-            checkArrowStatus(builder.Append(value), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.Append(value), 
column.get_name(),
+                                             array_builder->type()->name()));
         }
     } else if constexpr (std::is_same_v<T, Decimal128V3>) {
         auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
@@ -116,8 +117,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                 std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
         for (size_t i = start; i < end; ++i) {
             if (null_map && (*null_map)[i]) {
-                checkArrowStatus(builder.AppendNull(), column.get_name(),
-                                 array_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                                 
array_builder->type()->name()));
                 continue;
             }
             const auto& data_ref = col.get_data_at(i);
@@ -125,8 +126,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
             int64_t high = (p_value->value) >> 64;
             uint64 low = cast_set<uint64>((p_value->value) & 
0xFFFFFFFFFFFFFFFF);
             arrow::Decimal128 value(high, low);
-            checkArrowStatus(builder.Append(value), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.Append(value), 
column.get_name(),
+                                             array_builder->type()->name()));
         }
     } else if constexpr (std::is_same_v<T, Decimal<Int32>>) {
         auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
@@ -134,14 +135,14 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                 std::make_shared<arrow::Decimal128Type>(8, col.get_scale());
         for (size_t i = start; i < end; ++i) {
             if (null_map && (*null_map)[i]) {
-                checkArrowStatus(builder.AppendNull(), column.get_name(),
-                                 array_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                                 
array_builder->type()->name()));
                 continue;
             }
             Int128 p_value = Int128(col.get_element(i));
             arrow::Decimal128 value(reinterpret_cast<const 
uint8_t*>(&p_value));
-            checkArrowStatus(builder.Append(value), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.Append(value), 
column.get_name(),
+                                             array_builder->type()->name()));
         }
     } else if constexpr (std::is_same_v<T, Decimal<Int64>>) {
         auto& builder = 
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
@@ -149,14 +150,14 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                 std::make_shared<arrow::Decimal128Type>(18, col.get_scale());
         for (size_t i = start; i < end; ++i) {
             if (null_map && (*null_map)[i]) {
-                checkArrowStatus(builder.AppendNull(), column.get_name(),
-                                 array_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                                 
array_builder->type()->name()));
                 continue;
             }
             Int128 p_value = Int128(col.get_element(i));
             arrow::Decimal128 value(reinterpret_cast<const 
uint8_t*>(&p_value));
-            checkArrowStatus(builder.Append(value), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.Append(value), 
column.get_name(),
+                                             array_builder->type()->name()));
         }
     } else if constexpr (std::is_same_v<T, Decimal256>) {
         auto& builder = 
reinterpret_cast<arrow::Decimal256Builder&>(*array_builder);
@@ -164,8 +165,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
                 std::make_shared<arrow::Decimal256Type>(76, col.get_scale());
         for (size_t i = start; i < end; ++i) {
             if (null_map && (*null_map)[i]) {
-                checkArrowStatus(builder.AppendNull(), column.get_name(),
-                                 array_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                                 
array_builder->type()->name()));
                 continue;
             }
             auto p_value = wide::Int256(col.get_element(i));
@@ -177,20 +178,20 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
 
             std::array<uint64_t, 4> word_array = {a0, a1, a2, a3};
             arrow::Decimal256 value(arrow::Decimal256::LittleEndianArray, 
word_array);
-            checkArrowStatus(builder.Append(value), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.Append(value), 
column.get_name(),
+                                             array_builder->type()->name()));
         }
     } else {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "write_column_to_arrow with type " + 
column.get_name());
+        return Status::InvalidArgument("write_column_to_arrow with type " + 
column.get_name());
     }
+    return Status::OK();
 }
 
 template <typename T>
-void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
-                                                     const arrow::Array* 
arrow_array, int64_t start,
-                                                     int64_t end,
-                                                     const cctz::time_zone& 
ctz) const {
+Status DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
+                                                       const arrow::Array* 
arrow_array,
+                                                       int64_t start, int64_t 
end,
+                                                       const cctz::time_zone& 
ctz) const {
     auto& column_data = static_cast<ColumnDecimal<T>&>(column).get_data();
     // Decimal<Int128> for decimalv2
     // Decimal<Int128I> for deicmalv3
@@ -234,9 +235,10 @@ void 
DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
             column_data.emplace_back(*reinterpret_cast<const 
T*>(concrete_array->Value(value_i)));
         }
     } else {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "read_column_from_arrow with type " + 
column.get_name());
+        return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+                             "read_column_from_arrow with type " + 
column.get_name());
     }
+    return Status::OK();
 }
 
 template <typename T>
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h 
b/be/src/vec/data_types/serde/data_type_decimal_serde.h
index 2ade0705b9c..018a8791264 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.h
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h
@@ -100,11 +100,11 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
                                  const FormatOptions& options) const override;
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp 
b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
index 09b65597dbc..f558e2e827d 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
@@ -125,24 +125,25 @@ void DataTypeHLLSerDe::read_one_cell_from_jsonb(IColumn& 
column, const JsonbValu
     col.insert_value(hyper_log_log);
 }
 
-void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                             arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                             int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                               arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                               int64_t end, const 
cctz::time_zone& ctz) const {
     const auto& col = assert_cast<const ColumnHLL&>(column);
     auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
     for (size_t string_i = start; string_i < end; ++string_i) {
         if (null_map && (*null_map)[string_i]) {
-            checkArrowStatus(builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
         } else {
             auto& hll_value = 
const_cast<HyperLogLog&>(col.get_element(string_i));
             std::string memory_buffer(hll_value.max_serialized_size(), '0');
             hll_value.serialize((uint8_t*)memory_buffer.data());
-            checkArrowStatus(
+            RETURN_IF_ERROR(checkArrowStatus(
                     builder.Append(memory_buffer.data(), 
static_cast<int>(memory_buffer.size())),
-                    column.get_name(), array_builder->type()->name());
+                    column.get_name(), array_builder->type()->name()));
         }
     }
+    return Status::OK();
 }
 
 template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h 
b/be/src/vec/data_types/serde/data_type_hll_serde.h
index b096ac49d30..18c669d0be0 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.h
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.h
@@ -52,13 +52,13 @@ public:
                                  int32_t col_id, int64_t row_num) const 
override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "read_column_from_arrow with type " + 
column.get_name());
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override {
+        return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+                             "read_column_from_arrow with type " + 
column.get_name());
     }
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp 
b/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp
index 904a1401a0d..0b00a10062c 100644
--- a/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp
@@ -123,28 +123,31 @@ Status DataTypeIPv4SerDe::read_column_from_pb(IColumn& 
column, const PValues& ar
     return Status::OK();
 }
 
-void DataTypeIPv4SerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                              arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                              int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeIPv4SerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                                arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                                int64_t end, const 
cctz::time_zone& ctz) const {
     const auto& col_data = assert_cast<const ColumnIPv4&>(column).get_data();
     auto& int32_builder = assert_cast<arrow::Int32Builder&>(*array_builder);
     auto arrow_null_map = revert_null_map(null_map, start, end);
     auto* arrow_null_map_data = arrow_null_map.empty() ? nullptr : 
arrow_null_map.data();
-    checkArrowStatus(int32_builder.AppendValues(
-                             reinterpret_cast<const Int32*>(col_data.data()) + 
start, end - start,
-                             reinterpret_cast<const 
uint8_t*>(arrow_null_map_data)),
-                     column.get_name(), array_builder->type()->name());
+    RETURN_IF_ERROR(checkArrowStatus(
+            int32_builder.AppendValues(reinterpret_cast<const 
Int32*>(col_data.data()) + start,
+                                       end - start,
+                                       reinterpret_cast<const 
uint8_t*>(arrow_null_map_data)),
+            column.get_name(), array_builder->type()->name()));
+    return Status::OK();
 }
 
-void DataTypeIPv4SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                               int64_t start, int64_t end,
-                                               const cctz::time_zone& ctz) 
const {
+Status DataTypeIPv4SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                 int64_t start, int64_t end,
+                                                 const cctz::time_zone& ctz) 
const {
     auto& col_data = assert_cast<ColumnIPv4&>(column).get_data();
     int64_t row_count = end - start;
     /// buffers[0] is a null bitmap and buffers[1] are actual values
     std::shared_ptr<arrow::Buffer> buffer = arrow_array->data()->buffers[1];
     const auto* raw_data = reinterpret_cast<const UInt32*>(buffer->data()) + 
start;
     col_data.insert(raw_data, raw_data + row_count);
+    return Status::OK();
 }
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_ipv4_serde.h 
b/be/src/vec/data_types/serde/data_type_ipv4_serde.h
index 58c015fee41..b52b7e243fc 100644
--- a/be/src/vec/data_types/serde/data_type_ipv4_serde.h
+++ b/be/src/vec/data_types/serde/data_type_ipv4_serde.h
@@ -55,11 +55,11 @@ public:
     Status write_column_to_pb(const IColumn& column, PValues& result, int64_t 
start,
                               int64_t end) const override;
     Status read_column_from_pb(IColumn& column, const PValues& arg) const 
override;
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
 
 private:
     template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp 
b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
index 437c58cd2c9..d1baf9687ca 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
@@ -146,27 +146,29 @@ Status DataTypeIPv6SerDe::read_column_from_pb(IColumn& 
column, const PValues& ar
     return Status::OK();
 }
 
-void DataTypeIPv6SerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                              arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                              int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeIPv6SerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                                arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                                int64_t end, const 
cctz::time_zone& ctz) const {
     const auto& col_data = assert_cast<const ColumnIPv6&>(column).get_data();
     auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
     for (size_t i = start; i < end; ++i) {
         if (null_map && (*null_map)[i]) {
-            checkArrowStatus(string_builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(string_builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
         } else {
             std::string ipv6_str = IPv6Value::to_string(col_data[i]);
-            checkArrowStatus(string_builder.Append(ipv6_str.c_str(),
-                                                   cast_set<int, size_t, 
false>(ipv6_str.size())),
-                             column.get_name(), array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(
+                    string_builder.Append(ipv6_str.c_str(),
+                                          cast_set<int, size_t, 
false>(ipv6_str.size())),
+                    column.get_name(), array_builder->type()->name()));
         }
     }
+    return Status::OK();
 }
 
-void DataTypeIPv6SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                               int64_t start, int64_t end,
-                                               const cctz::time_zone& ctz) 
const {
+Status DataTypeIPv6SerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                 int64_t start, int64_t end,
+                                                 const cctz::time_zone& ctz) 
const {
     auto& col_data = assert_cast<ColumnIPv6&>(column).get_data();
     const auto* concrete_array = assert_cast<const 
arrow::StringArray*>(arrow_array);
     std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
@@ -182,9 +184,9 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn& 
column, const arrow::Arr
             } else {
                 IPv6 ipv6_val;
                 if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) 
{
-                    throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                           "parse number fail, string: '{}'",
-                                           std::string(raw_data, 
raw_data_len).c_str());
+                    return Status::Error(ErrorCode::INVALID_ARGUMENT,
+                                         "parse number fail, string: '{}'",
+                                         std::string(raw_data, 
raw_data_len).c_str());
                 }
                 col_data.emplace_back(ipv6_val);
             }
@@ -192,6 +194,7 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn& 
column, const arrow::Arr
             col_data.emplace_back(0);
         }
     }
+    return Status::OK();
 }
 
 Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, 
const IColumn& column,
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.h 
b/be/src/vec/data_types/serde/data_type_ipv6_serde.h
index 67e0dc9beac..24cd1e97f34 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.h
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.h
@@ -63,11 +63,11 @@ public:
                                const NullMap* null_map, 
orc::ColumnVectorBatch* orc_col_batch,
                                int64_t start, int64_t end,
                                std::vector<StringRef>& buffer_list) const 
override;
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
     void write_one_cell_to_jsonb(const IColumn& column, 
JsonbWriterT<JsonbOutStream>& result,
                                  Arena* mem_pool, int unique_id, int64_t 
row_num) const override;
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp 
b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
index 79df9656a9f..c3258e2c914 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
@@ -113,24 +113,26 @@ Status 
DataTypeJsonbSerDe::deserialize_one_cell_from_json(IColumn& column, Slice
     return Status::OK();
 }
 
-void DataTypeJsonbSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                               arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                               int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeJsonbSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                                 arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                                 int64_t end, const 
cctz::time_zone& ctz) const {
     const auto& string_column = assert_cast<const ColumnString&>(column);
     auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
     for (size_t string_i = start; string_i < end; ++string_i) {
         if (null_map && (*null_map)[string_i]) {
-            checkArrowStatus(builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
             continue;
         }
         std::string_view string_ref = 
string_column.get_data_at(string_i).to_string_view();
         std::string json_string =
                 JsonbToJson::jsonb_to_json_string(string_ref.data(), 
string_ref.size());
-        checkArrowStatus(builder.Append(json_string.data(),
-                                        cast_set<int, size_t, 
false>(json_string.size())),
-                         column.get_name(), array_builder->type()->name());
+        RETURN_IF_ERROR(
+                checkArrowStatus(builder.Append(json_string.data(),
+                                                cast_set<int, size_t, 
false>(json_string.size())),
+                                 column.get_name(), 
array_builder->type()->name()));
     }
+    return Status::OK();
 }
 
 Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, 
const IColumn& column,
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.h 
b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
index f1fc1634b5c..c7acb045be6 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.h
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
@@ -43,9 +43,9 @@ public:
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
                                  const FormatOptions& options) const override;
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
 
     Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, 
BufferWritable& bw,
                                       FormatOptions& options) const override;
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp 
b/be/src/vec/data_types/serde/data_type_map_serde.cpp
index d2c311b70d9..c4efda69349 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp
@@ -330,9 +330,9 @@ void DataTypeMapSerDe::write_one_cell_to_jsonb(const 
IColumn& column, JsonbWrite
     result.writeEndBinary();
 }
 
-void DataTypeMapSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                             arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                             int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeMapSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                               arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                               int64_t end, const 
cctz::time_zone& ctz) const {
     auto& builder = assert_cast<arrow::MapBuilder&>(*array_builder);
     const auto& map_column = assert_cast<const ColumnMap&>(column);
     const IColumn& nested_keys_column = map_column.get_keys();
@@ -348,8 +348,8 @@ void DataTypeMapSerDe::write_column_to_arrow(const IColumn& 
column, const NullMa
 
     for (size_t r = start; r < end; ++r) {
         if ((null_map && (*null_map)[r])) {
-            checkArrowStatus(builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
         } else if (simd::contain_byte(keys_nullmap_data + offsets[r - 1],
                                       offsets[r] - offsets[r - 1], 1)) {
             // arrow do not support key is null, so we ignore the null 
key-value
@@ -357,31 +357,35 @@ void DataTypeMapSerDe::write_column_to_arrow(const 
IColumn& column, const NullMa
             MutableColumnPtr value_mutable_data = 
nested_values_column.clone_empty();
             for (size_t i = offsets[r - 1]; i < offsets[r]; ++i) {
                 if (keys_nullmap_data[i] == 1) {
-                    throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                           "Can not write null value of map 
key to arrow.");
+                    return Status::Error(ErrorCode::INVALID_ARGUMENT,
+                                         "Can not write null value of map key 
to arrow.");
                 }
                 key_mutable_data->insert_from(nested_keys_column, i);
                 value_mutable_data->insert_from(nested_values_column, i);
             }
-            checkArrowStatus(builder.Append(), column.get_name(), 
array_builder->type()->name());
-
-            key_serde->write_column_to_arrow(*key_mutable_data, nullptr, 
key_builder, 0,
-                                             key_mutable_data->size(), ctz);
-            value_serde->write_column_to_arrow(*value_mutable_data, nullptr, 
value_builder, 0,
-                                               value_mutable_data->size(), 
ctz);
+            RETURN_IF_ERROR(checkArrowStatus(builder.Append(), 
column.get_name(),
+                                             array_builder->type()->name()));
+
+            RETURN_IF_ERROR(key_serde->write_column_to_arrow(
+                    *key_mutable_data, nullptr, key_builder, 0, 
key_mutable_data->size(), ctz));
+            
RETURN_IF_ERROR(value_serde->write_column_to_arrow(*value_mutable_data, nullptr,
+                                                               value_builder, 
0,
+                                                               
value_mutable_data->size(), ctz));
         } else {
-            checkArrowStatus(builder.Append(), column.get_name(), 
array_builder->type()->name());
-            key_serde->write_column_to_arrow(nested_keys_column, nullptr, 
key_builder,
-                                             offsets[r - 1], offsets[r], ctz);
-            value_serde->write_column_to_arrow(nested_values_column, nullptr, 
value_builder,
-                                               offsets[r - 1], offsets[r], 
ctz);
+            RETURN_IF_ERROR(checkArrowStatus(builder.Append(), 
column.get_name(),
+                                             array_builder->type()->name()));
+            RETURN_IF_ERROR(key_serde->write_column_to_arrow(
+                    nested_keys_column, nullptr, key_builder, offsets[r - 1], 
offsets[r], ctz));
+            RETURN_IF_ERROR(value_serde->write_column_to_arrow(
+                    nested_values_column, nullptr, value_builder, offsets[r - 
1], offsets[r], ctz));
         }
     }
+    return Status::OK();
 }
 
-void DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                              int64_t start, int64_t end,
-                                              const cctz::time_zone& ctz) 
const {
+Status DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                int64_t start, int64_t end,
+                                                const cctz::time_zone& ctz) 
const {
     auto& column_map = static_cast<ColumnMap&>(column);
     auto& offsets_data = column_map.get_offsets();
     const auto* concrete_map = dynamic_cast<const 
arrow::MapArray*>(arrow_array);
@@ -394,10 +398,13 @@ void DataTypeMapSerDe::read_column_from_arrow(IColumn& 
column, const arrow::Arra
         // convert to doris offset, start from offsets.back()
         offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) - 
arrow_nested_start_offset);
     }
-    key_serde->read_column_from_arrow(column_map.get_keys(), 
concrete_map->keys().get(),
-                                      arrow_nested_start_offset, 
arrow_nested_end_offset, ctz);
-    value_serde->read_column_from_arrow(column_map.get_values(), 
concrete_map->items().get(),
-                                        arrow_nested_start_offset, 
arrow_nested_end_offset, ctz);
+    RETURN_IF_ERROR(key_serde->read_column_from_arrow(
+            column_map.get_keys(), concrete_map->keys().get(), 
arrow_nested_start_offset,
+            arrow_nested_end_offset, ctz));
+    RETURN_IF_ERROR(value_serde->read_column_from_arrow(
+            column_map.get_values(), concrete_map->items().get(), 
arrow_nested_start_offset,
+            arrow_nested_end_offset, ctz));
+    return Status::OK();
 }
 
 template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h 
b/be/src/vec/data_types/serde/data_type_map_serde.h
index 7472d599377..6a129a46e5c 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.h
+++ b/be/src/vec/data_types/serde/data_type_map_serde.h
@@ -72,11 +72,11 @@ public:
                                  int32_t col_id, int64_t row_num) const 
override;
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
diff --git a/be/src/vec/data_types/serde/data_type_nothing_serde.h 
b/be/src/vec/data_types/serde/data_type_nothing_serde.h
index f31faa5c1e6..e31df4de419 100644
--- a/be/src/vec/data_types/serde/data_type_nothing_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nothing_serde.h
@@ -90,16 +90,16 @@ public:
                                "read_one_cell_from_jsonb with type " + 
column.get_name());
     }
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "write_column_to_arrow with type " + 
column.get_name());
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override {
+        return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+                             "write_column_to_arrow with type " + 
column.get_name());
     }
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "read_column_from_arrow with type " + 
column.get_name());
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override {
+        return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+                             "read_column_from_arrow with type " + 
column.get_name());
     }
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp 
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index b8151dc650c..55fc09e513b 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -288,18 +288,20 @@ void 
DataTypeNullableSerDe::read_one_cell_from_jsonb(IColumn& column, const Json
    1/ convert the null_map from doris to arrow null byte map
    2/ pass the arrow null byteamp to nested column , and call AppendValues
 **/
-void DataTypeNullableSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                                  arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                                  int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeNullableSerDe::write_column_to_arrow(const IColumn& column, 
const NullMap* null_map,
+                                                    arrow::ArrayBuilder* 
array_builder,
+                                                    int64_t start, int64_t end,
+                                                    const cctz::time_zone& 
ctz) const {
     const auto& column_nullable = assert_cast<const ColumnNullable&>(column);
-    nested_serde->write_column_to_arrow(column_nullable.get_nested_column(),
-                                        &column_nullable.get_null_map_data(), 
array_builder, start,
-                                        end, ctz);
+    return 
nested_serde->write_column_to_arrow(column_nullable.get_nested_column(),
+                                               
&column_nullable.get_null_map_data(), array_builder,
+                                               start, end, ctz);
 }
 
-void DataTypeNullableSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                                   int64_t start, int64_t end,
-                                                   const cctz::time_zone& ctz) 
const {
+Status DataTypeNullableSerDe::read_column_from_arrow(IColumn& column,
+                                                     const arrow::Array* 
arrow_array, int64_t start,
+                                                     int64_t end,
+                                                     const cctz::time_zone& 
ctz) const {
     auto& col = reinterpret_cast<ColumnNullable&>(column);
     NullMap& map_data = col.get_null_map_data();
     for (auto i = start; i < end; ++i) {
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h 
b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 8ed0bb6826d..9f979120cea 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -72,11 +72,11 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
                                  const FormatOptions& options) const override;
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp 
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index 5888afab824..17b703f63ab 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -69,10 +69,10 @@ using DORIS_NUMERIC_ARROW_BUILDER =
                 >;
 
 template <PrimitiveType T>
-void DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column, 
const NullMap* null_map,
-                                                   arrow::ArrayBuilder* 
array_builder,
-                                                   int64_t start, int64_t end,
-                                                   const cctz::time_zone& ctz) 
const {
+Status DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column, 
const NullMap* null_map,
+                                                     arrow::ArrayBuilder* 
array_builder,
+                                                     int64_t start, int64_t 
end,
+                                                     const cctz::time_zone& 
ctz) const {
     auto& col_data = assert_cast<const ColumnType&>(column).get_data();
     using ARROW_BUILDER_TYPE =
             typename TypeMapLookup<typename 
PrimitiveTypeTraits<T>::ColumnItemType,
@@ -83,16 +83,16 @@ void DataTypeNumberSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
         auto* null_builder = dynamic_cast<arrow::NullBuilder*>(array_builder);
         if (null_builder) {
             for (size_t i = start; i < end; ++i) {
-                checkArrowStatus(null_builder->AppendNull(), column.get_name(),
-                                 null_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(null_builder->AppendNull(), 
column.get_name(),
+                                                 
null_builder->type()->name()));
             }
         } else {
             auto& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
-            checkArrowStatus(
+            RETURN_IF_ERROR(checkArrowStatus(
                     builder.AppendValues(reinterpret_cast<const 
uint8_t*>(col_data.data() + start),
                                          end - start,
                                          reinterpret_cast<const 
uint8_t*>(arrow_null_map_data)),
-                    column.get_name(), array_builder->type()->name());
+                    column.get_name(), array_builder->type()->name()));
         }
 
     } else if constexpr (T == TYPE_LARGEINT) {
@@ -101,23 +101,24 @@ void DataTypeNumberSerDe<T>::write_column_to_arrow(const 
IColumn& column, const
             auto& data_value = col_data[i];
             std::string value_str = fmt::format("{}", data_value);
             if (null_map && (*null_map)[i]) {
-                checkArrowStatus(string_builder.AppendNull(), 
column.get_name(),
-                                 array_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(string_builder.AppendNull(), 
column.get_name(),
+                                                 
array_builder->type()->name()));
             } else {
-                checkArrowStatus(
+                RETURN_IF_ERROR(checkArrowStatus(
                         string_builder.Append(value_str.data(),
                                               cast_set<int, size_t, 
false>(value_str.length())),
-                        column.get_name(), array_builder->type()->name());
+                        column.get_name(), array_builder->type()->name()));
             }
         }
     } else if constexpr (T == TYPE_IPV6) {
     } else {
         auto& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
-        checkArrowStatus(
+        RETURN_IF_ERROR(checkArrowStatus(
                 builder.AppendValues(col_data.data() + start, end - start,
                                      reinterpret_cast<const 
uint8_t*>(arrow_null_map_data)),
-                column.get_name(), array_builder->type()->name());
+                column.get_name(), array_builder->type()->name()));
     }
+    return Status::OK();
 }
 
 template <PrimitiveType T>
@@ -193,9 +194,10 @@ Status 
DataTypeNumberSerDe<T>::deserialize_column_from_json_vector(
 }
 
 template <PrimitiveType T>
-void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
-                                                    const arrow::Array* 
arrow_array, int64_t start,
-                                                    int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
+                                                      const arrow::Array* 
arrow_array,
+                                                      int64_t start, int64_t 
end,
+                                                      const cctz::time_zone& 
ctz) const {
     auto row_count = end - start;
     auto& col_data = static_cast<ColumnType&>(column).get_data();
 
@@ -205,7 +207,7 @@ void 
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
         for (size_t bool_i = 0; bool_i != 
static_cast<size_t>(concrete_array->length()); ++bool_i) {
             col_data.emplace_back(concrete_array->Value(bool_i));
         }
-        return;
+        return Status::OK();
     }
 
     // only for largeint(int128) type
@@ -224,9 +226,9 @@ void 
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
                     Int128 val = 0;
                     ReadBuffer rb(raw_data, raw_data_len);
                     if (!read_int_text_impl(val, rb)) {
-                        throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                               "parse number fail, string: 
'{}'",
-                                               std::string(rb.position(), 
rb.count()).c_str());
+                        return Status::Error(ErrorCode::INVALID_ARGUMENT,
+                                             "parse number fail, string: '{}'",
+                                             std::string(rb.position(), 
rb.count()).c_str());
                     }
                     col_data.emplace_back(val);
                 }
@@ -234,7 +236,7 @@ void 
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
                 col_data.emplace_back(Int128()); // Int128() is NULL
             }
         }
-        return;
+        return Status::OK();
     }
 
     /// buffers[0] is a null bitmap and buffers[1] are actual values
@@ -243,6 +245,7 @@ void 
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
                                    buffer->data()) +
                            start;
     col_data.insert(raw_data, raw_data + row_count);
+    return Status::OK();
 }
 template <PrimitiveType T>
 Status DataTypeNumberSerDe<T>::deserialize_column_from_fixed_json(
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h 
b/be/src/vec/data_types/serde/data_type_number_serde.h
index b69629950d8..ffb5cd8eb21 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.h
+++ b/be/src/vec/data_types/serde/data_type_number_serde.h
@@ -86,11 +86,11 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp 
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index d1e0084ecd8..42acec9bd3d 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -31,6 +31,7 @@
 #include "vec/common/schema_util.h"
 #include "vec/core/field.h"
 #include "vec/core/types.h"
+#include "vec/data_types/serde/data_type_serde.h"
 
 namespace doris {
 
@@ -144,26 +145,29 @@ Status 
DataTypeVariantSerDe::serialize_one_cell_to_json(const IColumn& column, i
     return Status::OK();
 }
 
-void DataTypeVariantSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                                 arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                                 int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeVariantSerDe::write_column_to_arrow(const IColumn& column, 
const NullMap* null_map,
+                                                   arrow::ArrayBuilder* 
array_builder,
+                                                   int64_t start, int64_t end,
+                                                   const cctz::time_zone& ctz) 
const {
     const auto* var = check_and_get_column<ColumnVariant>(column);
     auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
     for (size_t i = start; i < end; ++i) {
         if (null_map && (*null_map)[i]) {
-            checkArrowStatus(builder.AppendNull(), column.get_name(),
-                             array_builder->type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                             array_builder->type()->name()));
         } else {
             std::string serialized_value;
             if (!var->serialize_one_row_to_string(i, &serialized_value)) {
-                throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to 
serialize variant {}",
-                                       var->dump_structure());
+                return Status::Error(ErrorCode::INTERNAL_ERROR, "Failed to 
serialize variant {}",
+                                     var->dump_structure());
             }
-            checkArrowStatus(builder.Append(serialized_value.data(),
-                                            
static_cast<int>(serialized_value.size())),
-                             column.get_name(), array_builder->type()->name());
+            RETURN_IF_ERROR(
+                    checkArrowStatus(builder.Append(serialized_value.data(),
+                                                    
static_cast<int>(serialized_value.size())),
+                                     column.get_name(), 
array_builder->type()->name()));
         }
     }
+    return Status::OK();
 }
 
 Status DataTypeVariantSerDe::write_column_to_orc(const std::string& timezone, 
const IColumn& column,
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h 
b/be/src/vec/data_types/serde/data_type_object_serde.h
index 384bdf6727c..3c7a27ff48f 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -68,13 +68,13 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "read_column_from_arrow with type " + 
column.get_name());
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override {
+        return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+                             "read_column_from_arrow with type " + 
column.get_name());
     }
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h 
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
index 8920b822d5f..adcbc2a5a5e 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
@@ -114,29 +114,31 @@ public:
         col.insert_value(val);
     }
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override {
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override {
         const auto& col = assert_cast<const ColumnQuantileState&>(column);
         auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
         for (size_t string_i = start; string_i < end; ++string_i) {
             if (null_map && (*null_map)[string_i]) {
-                checkArrowStatus(builder.AppendNull(), column.get_name(),
-                                 array_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                                 
array_builder->type()->name()));
             } else {
                 auto& quantile_state_value = 
const_cast<QuantileState&>(col.get_element(string_i));
                 std::string 
memory_buffer(quantile_state_value.get_serialized_size(), '0');
                 quantile_state_value.serialize((uint8_t*)memory_buffer.data());
-                checkArrowStatus(builder.Append(memory_buffer.data(),
-                                                
static_cast<int>(memory_buffer.size())),
-                                 column.get_name(), 
array_builder->type()->name());
+                RETURN_IF_ERROR(
+                        checkArrowStatus(builder.Append(memory_buffer.data(),
+                                                        
static_cast<int>(memory_buffer.size())),
+                                         column.get_name(), 
array_builder->type()->name()));
             }
         }
+        return Status::OK();
     }
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "read_column_from_arrow with type " + 
column.get_name());
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override {
+        return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+                             "read_column_from_arrow with type " + 
column.get_name());
     }
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_serde.h 
b/be/src/vec/data_types/serde/data_type_serde.h
index a53c3dd5136..cc9baec4f1e 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -344,12 +344,12 @@ public:
     // JSON serializer and deserializer
 
     // Arrow serializer and deserializer
-    virtual void write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
-                                       arrow::ArrayBuilder* array_builder, 
int64_t start,
-                                       int64_t end, const cctz::time_zone& 
ctz) const = 0;
-    virtual void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array,
-                                        int64_t start, int64_t end,
-                                        const cctz::time_zone& ctz) const = 0;
+    virtual Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                         arrow::ArrayBuilder* array_builder, 
int64_t start,
+                                         int64_t end, const cctz::time_zone& 
ctz) const = 0;
+    virtual Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array,
+                                          int64_t start, int64_t end,
+                                          const cctz::time_zone& ctz) const = 
0;
 
     // ORC serializer
     virtual Status write_column_to_orc(const std::string& timezone, const 
IColumn& column,
@@ -405,13 +405,13 @@ inline static NullMap revert_null_map(const NullMap* 
null_bytemap, size_t start,
     return res;
 }
 
-inline void checkArrowStatus(const arrow::Status& status, const std::string& 
column,
-                             const std::string& format_name) {
+inline Status checkArrowStatus(const arrow::Status& status, const std::string& 
column,
+                               const std::string& format_name) {
     if (!status.ok()) {
-        throw Exception(
-                Status::FatalError("arrow serde with arrow: {} with column : 
{} with error msg: {}",
-                                   format_name, column, status.ToString()));
+        return Status::FatalError("arrow serde with arrow: {} with column : {} 
with error msg: {}",
+                                  format_name, column, status.ToString());
     }
+    return Status::OK();
 }
 
 DataTypeSerDeSPtrs create_data_type_serdes(
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h 
b/be/src/vec/data_types/serde/data_type_string_serde.h
index 4af97453e00..09d6762e43c 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.h
+++ b/be/src/vec/data_types/serde/data_type_string_serde.h
@@ -271,25 +271,41 @@ public:
         assert_cast<ColumnType&>(column).insert_data(blob->getBlob(), 
blob->getBlobLen());
     }
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override {
+    template <typename BuilderType>
+    Status write_column_to_arrow_impl(const IColumn& column, const NullMap* 
null_map,
+                                      BuilderType& builder, int64_t start, 
int64_t end) const {
         const auto& string_column = assert_cast<const ColumnType&>(column);
-        auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
         for (size_t string_i = start; string_i < end; ++string_i) {
             if (null_map && (*null_map)[string_i]) {
-                checkArrowStatus(builder.AppendNull(), column.get_name(),
-                                 array_builder->type()->name());
+                RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
column.get_name(),
+                                                 builder.type()->name()));
                 continue;
             }
             auto string_ref = string_column.get_data_at(string_i);
-            checkArrowStatus(
+            RETURN_IF_ERROR(checkArrowStatus(
                     builder.Append(string_ref.data, cast_set<int, size_t, 
false>(string_ref.size)),
-                    column.get_name(), array_builder->type()->name());
+                    column.get_name(), builder.type()->name()));
+        }
+        return Status::OK();
+    }
+
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override {
+        if (array_builder->type()->id() == arrow::Type::LARGE_STRING) {
+            auto& builder = 
assert_cast<arrow::LargeStringBuilder&>(*array_builder);
+            return write_column_to_arrow_impl(column, null_map, builder, 
start, end);
+        } else if (array_builder->type()->id() == arrow::Type::STRING) {
+            auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+            return write_column_to_arrow_impl(column, null_map, builder, 
start, end);
+        } else {
+            return Status::InvalidArgument("Unsupported arrow type for string 
column: {}",
+                                           array_builder->type()->name());
         }
     }
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override {
+
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override {
         if (arrow_array->type_id() == arrow::Type::STRING ||
             arrow_array->type_id() == arrow::Type::BINARY) {
             const auto* concrete_array = dynamic_cast<const 
arrow::BinaryArray*>(arrow_array);
@@ -318,7 +334,25 @@ public:
                     assert_cast<ColumnType&>(column).insert_default();
                 }
             }
+        } else if (arrow_array->type_id() == arrow::Type::LARGE_STRING ||
+                   arrow_array->type_id() == arrow::Type::LARGE_BINARY) {
+            const auto* concrete_array = dynamic_cast<const 
arrow::LargeBinaryArray*>(arrow_array);
+            std::shared_ptr<arrow::Buffer> buffer = 
concrete_array->value_data();
+
+            for (auto offset_i = start; offset_i < end; ++offset_i) {
+                if (!concrete_array->IsNull(offset_i)) {
+                    const auto* raw_data = buffer->data() + 
concrete_array->value_offset(offset_i);
+                    assert_cast<ColumnType&>(column).insert_data(
+                            (char*)raw_data, 
concrete_array->value_length(offset_i));
+                } else {
+                    assert_cast<ColumnType&>(column).insert_default();
+                }
+            }
+        } else {
+            return Status::InvalidArgument("Unsupported arrow type for string 
column: {}",
+                                           arrow_array->type_id());
         }
+        return Status::OK();
     }
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp 
b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
index dc314767a95..c86d0f9b259 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
@@ -24,6 +24,7 @@
 #include "vec/columns/column_const.h"
 #include "vec/columns/column_struct.h"
 #include "vec/common/string_ref.h"
+#include "vec/data_types/serde/data_type_serde.h"
 
 namespace doris {
 
@@ -321,36 +322,39 @@ void 
DataTypeStructSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbV
     column.deserialize_and_insert_from_arena(blob->getBlob());
 }
 
-void DataTypeStructSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
-                                                arrow::ArrayBuilder* 
array_builder, int64_t start,
-                                                int64_t end, const 
cctz::time_zone& ctz) const {
+Status DataTypeStructSerDe::write_column_to_arrow(const IColumn& column, const 
NullMap* null_map,
+                                                  arrow::ArrayBuilder* 
array_builder, int64_t start,
+                                                  int64_t end, const 
cctz::time_zone& ctz) const {
     auto& builder = assert_cast<arrow::StructBuilder&>(*array_builder);
     const auto& struct_column = assert_cast<const ColumnStruct&>(column);
     for (auto r = start; r < end; ++r) {
         if (null_map != nullptr && (*null_map)[r]) {
-            checkArrowStatus(builder.AppendNull(), struct_column.get_name(),
-                             builder.type()->name());
+            RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), 
struct_column.get_name(),
+                                             builder.type()->name()));
             continue;
         }
-        checkArrowStatus(builder.Append(), struct_column.get_name(), 
builder.type()->name());
+        RETURN_IF_ERROR(checkArrowStatus(builder.Append(), 
struct_column.get_name(),
+                                         builder.type()->name()));
         for (auto ei = 0; ei < struct_column.tuple_size(); ++ei) {
             auto* elem_builder = builder.field_builder(ei);
-            
elem_serdes_ptrs[ei]->write_column_to_arrow(struct_column.get_column(ei), 
nullptr,
-                                                        elem_builder, r, r + 
1, ctz);
+            RETURN_IF_ERROR(elem_serdes_ptrs[ei]->write_column_to_arrow(
+                    struct_column.get_column(ei), nullptr, elem_builder, r, r 
+ 1, ctz));
         }
     }
+    return Status::OK();
 }
 
-void DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
-                                                 int64_t start, int64_t end,
-                                                 const cctz::time_zone& ctz) 
const {
+Status DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const 
arrow::Array* arrow_array,
+                                                   int64_t start, int64_t end,
+                                                   const cctz::time_zone& ctz) 
const {
     auto& struct_column = static_cast<ColumnStruct&>(column);
     const auto* concrete_struct = dynamic_cast<const 
arrow::StructArray*>(arrow_array);
     DCHECK_EQ(struct_column.tuple_size(), concrete_struct->num_fields());
     for (auto i = 0; i < struct_column.tuple_size(); ++i) {
-        elem_serdes_ptrs[i]->read_column_from_arrow(
-                struct_column.get_column(i), concrete_struct->field(i).get(), 
start, end, ctz);
+        RETURN_IF_ERROR(elem_serdes_ptrs[i]->read_column_from_arrow(
+                struct_column.get_column(i), concrete_struct->field(i).get(), 
start, end, ctz));
     }
+    return Status::OK();
 }
 
 template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h 
b/be/src/vec/data_types/serde/data_type_struct_serde.h
index a1f4cdd3b57..cd24b02a3ed 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.h
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.h
@@ -147,11 +147,11 @@ public:
 
     void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
-    void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
-                               arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
-                               const cctz::time_zone& ctz) const override;
-    void read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
-                                int64_t end, const cctz::time_zone& ctz) const 
override;
+    Status write_column_to_arrow(const IColumn& column, const NullMap* 
null_map,
+                                 arrow::ArrayBuilder* array_builder, int64_t 
start, int64_t end,
+                                 const cctz::time_zone& ctz) const override;
+    Status read_column_from_arrow(IColumn& column, const arrow::Array* 
arrow_array, int64_t start,
+                                  int64_t end, const cctz::time_zone& ctz) 
const override;
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int64_t row_idx, bool col_const,
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp 
b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
index c6d9644e2dc..bb9e40b2e3e 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
@@ -24,6 +24,7 @@
 #include "arrow/result.h"
 #include "arrow_pip_input_stream.h"
 #include "common/logging.h"
+#include "common/status.h"
 #include "io/fs/stream_load_pipe.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
@@ -99,8 +100,8 @@ Status ArrowStreamReader::get_next_block(Block* block, 
size_t* read_rows, bool*
             try {
                 const vectorized::ColumnWithTypeAndName& column_with_name =
                         block->get_by_name(column_name);
-                column_with_name.type->get_serde()->read_column_from_arrow(
-                        column_with_name.column->assume_mutable_ref(), column, 
0, num_rows, _ctzz);
+                
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
+                        column_with_name.column->assume_mutable_ref(), column, 
0, num_rows, _ctzz));
             } catch (Exception& e) {
                 return Status::InternalError("Failed to convert from arrow to 
block: {}", e.what());
             }
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp 
b/be/src/vec/utils/arrow_column_to_doris_column.cpp
index 99a9d70bc09..ed931af9477 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.cpp
+++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp
@@ -34,6 +34,7 @@
 #include "arrow/array/array_binary.h"
 #include "arrow/array/array_nested.h"
 #include "arrow/type.h"
+#include "common/status.h"
 #include "util/binary_cast.hpp"
 #include "util/timezone_utils.h"
 #include "vec/columns/column.h"
@@ -99,9 +100,9 @@ Status arrow_column_to_doris_column(const arrow::Array* 
arrow_column, size_t arr
 Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t 
arrow_batch_cur_idx,
                                     ColumnPtr& doris_column, const 
DataTypePtr& type,
                                     size_t num_elements, const 
cctz::time_zone& ctz) {
-    
type->get_serde()->read_column_from_arrow(doris_column->assume_mutable_ref(), 
arrow_column,
-                                              arrow_batch_cur_idx,
-                                              arrow_batch_cur_idx + 
num_elements, ctz);
+    RETURN_IF_ERROR(type->get_serde()->read_column_from_arrow(
+            doris_column->assume_mutable_ref(), arrow_column, 
arrow_batch_cur_idx,
+            arrow_batch_cur_idx + num_elements, ctz));
     return Status::OK();
 }
 
diff --git a/be/test/vec/data_types/common_data_type_serder_test.h 
b/be/test/vec/data_types/common_data_type_serder_test.h
index ef8d07323df..70b3f976e8b 100644
--- a/be/test/vec/data_types/common_data_type_serder_test.h
+++ b/be/test/vec/data_types/common_data_type_serder_test.h
@@ -392,8 +392,7 @@ public:
         auto rows = record_batch->num_rows();
         for (size_t i = 0; i < record_batch->num_columns(); ++i) {
             auto array = record_batch->column(i);
-            std::cout << "arrow record_batch pos: " << i << ", array: " << 
array->ToString()
-                      << std::endl;
+            std::cout << "arrow record_batch pos: " << i << std::endl;
             auto& column_with_type_and_name = new_block->get_by_position(i);
             std::cout << "now we are testing column: "
                       << column_with_type_and_name.column->get_name()
@@ -403,15 +402,17 @@ public:
                                                  
column_with_type_and_name.type, rows, "UTC");
             // do check data
             std::cout << "arrow_column_to_doris_column done, column data: "
-                      << column_with_type_and_name.to_string(0)
+                      << column_with_type_and_name.to_string(0).substr(0, 256)
                       << ", column size: " << 
column_with_type_and_name.column->size() << std::endl;
             EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" << 
ret.to_string();
         }
         std::cout << "arrow deserialize block structure: " << 
new_block->dump_structure()
                   << std::endl;
         std::cout << "arrow deserialize block data: "
-                  << new_block->dump_data(
-                             0, std::min(static_cast<size_t>(rows), 
static_cast<size_t>(5)))
+                  << new_block
+                             ->dump_data(
+                                     0, std::min(static_cast<size_t>(rows), 
static_cast<size_t>(5)))
+                             .substr(0, 256)
                   << std::endl;
     }
 
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
index adc3841b98b..9f30a69eb3b 100644
--- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -474,4 +474,24 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
     CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
 }
 
+TEST(DataTypeSerDeArrowTest, BigStringSerDeTest) {
+    std::string col_name = "big_string";
+    auto block = std::make_shared<Block>();
+    auto strcol = vectorized::ColumnString::create();
+    // 2G, if > 4G report string column length is too large: 
total_length=4402341462
+    for (int i = 0; i < 20; ++i) {
+        std::string is(107374182, '0'); // 100M
+        strcol->insert_data(is.c_str(), is.size());
+    }
+    vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeString>());
+    vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), 
data_type, col_name);
+    block->insert(type_and_name);
+
+    std::shared_ptr<arrow::RecordBatch> record_batch =
+            CommonDataTypeSerdeTest::serialize_arrow(block);
+    auto assert_block = std::make_shared<Block>(block->clone_empty());
+    CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+    CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to