This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d3a247eb1aa [fix](arrow-flight) Support SerDe of Doris string larger
than 2GB to Arrow large string (#51265)
d3a247eb1aa is described below
commit d3a247eb1aab710e55e619866024edf73e85811a
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu May 29 10:40:26 2025 +0800
[fix](arrow-flight) Support SerDe of Doris string larger than 2GB to Arrow
large string (#51265)
### What problem does this PR solve?
In Arrow type, `utf8` is smaller than 2GB, and `large_utf8` is used for
larger than 2GB.
Fix:
```
arrow serde with arrow: utf8 with column : String with error msg: Capacity
error: array cannot contain more than 2147483646 bytes
```
---
.../arrow_flight/arrow_flight_batch_reader.cpp | 12 +++-
.../arrow_flight/arrow_flight_batch_reader.h | 3 +
be/src/util/arrow/block_convertor.cpp | 14 +++--
be/src/util/arrow/row_batch.h | 2 +
be/src/util/arrow/utils.h | 10 ++++
.../vec/data_types/serde/data_type_array_serde.cpp | 25 ++++----
.../vec/data_types/serde/data_type_array_serde.h | 10 ++--
.../data_types/serde/data_type_bitmap_serde.cpp | 15 ++---
.../vec/data_types/serde/data_type_bitmap_serde.h | 16 +++---
.../data_types/serde/data_type_date64_serde.cpp | 48 ++++++++--------
.../vec/data_types/serde/data_type_date64_serde.h | 18 +++---
.../serde/data_type_datetimev2_serde.cpp | 32 ++++++-----
.../data_types/serde/data_type_datetimev2_serde.h | 10 ++--
.../data_types/serde/data_type_datev2_serde.cpp | 23 ++++----
.../vec/data_types/serde/data_type_datev2_serde.h | 10 ++--
.../data_types/serde/data_type_decimal_serde.cpp | 66 +++++++++++-----------
.../vec/data_types/serde/data_type_decimal_serde.h | 10 ++--
.../vec/data_types/serde/data_type_hll_serde.cpp | 15 ++---
be/src/vec/data_types/serde/data_type_hll_serde.h | 14 ++---
.../vec/data_types/serde/data_type_ipv4_serde.cpp | 23 ++++----
be/src/vec/data_types/serde/data_type_ipv4_serde.h | 10 ++--
.../vec/data_types/serde/data_type_ipv6_serde.cpp | 31 +++++-----
be/src/vec/data_types/serde/data_type_ipv6_serde.h | 10 ++--
.../vec/data_types/serde/data_type_jsonb_serde.cpp | 18 +++---
.../vec/data_types/serde/data_type_jsonb_serde.h | 6 +-
.../vec/data_types/serde/data_type_map_serde.cpp | 57 +++++++++++--------
be/src/vec/data_types/serde/data_type_map_serde.h | 10 ++--
.../vec/data_types/serde/data_type_nothing_serde.h | 18 +++---
.../data_types/serde/data_type_nullable_serde.cpp | 20 ++++---
.../data_types/serde/data_type_nullable_serde.h | 10 ++--
.../data_types/serde/data_type_number_serde.cpp | 47 +++++++--------
.../vec/data_types/serde/data_type_number_serde.h | 10 ++--
.../data_types/serde/data_type_object_serde.cpp | 24 ++++----
.../vec/data_types/serde/data_type_object_serde.h | 14 ++---
.../serde/data_type_quantilestate_serde.h | 26 +++++----
be/src/vec/data_types/serde/data_type_serde.h | 22 ++++----
.../vec/data_types/serde/data_type_string_serde.h | 54 ++++++++++++++----
.../data_types/serde/data_type_struct_serde.cpp | 30 +++++-----
.../vec/data_types/serde/data_type_struct_serde.h | 10 ++--
.../vec/exec/format/arrow/arrow_stream_reader.cpp | 5 +-
be/src/vec/utils/arrow_column_to_doris_column.cpp | 7 ++-
.../vec/data_types/common_data_type_serder_test.h | 11 ++--
.../serde/data_type_serde_arrow_test.cpp | 20 +++++++
43 files changed, 490 insertions(+), 356 deletions(-)
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
index 969e743040c..07e46cfcfed 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
@@ -90,7 +90,7 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>>
ArrowFlightBatchLoca
return result;
}
-arrow::Status
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out)
{
+arrow::Status
ArrowFlightBatchLocalReader::ReadNextImpl(std::shared_ptr<arrow::RecordBatch>*
out) {
// parameter *out not nullptr
*out = nullptr;
SCOPED_ATTACH_TASK(_mem_tracker);
@@ -124,6 +124,10 @@ arrow::Status
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::Recor
return arrow::Status::OK();
}
+arrow::Status
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out)
{
+ RETURN_ARROW_STATUS_IF_CATCH_EXCEPTION(ReadNextImpl(out));
+}
+
ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader(
const std::shared_ptr<QueryStatement>& statement,
const std::shared_ptr<PBackendService_Stub>& stub)
@@ -284,7 +288,7 @@ arrow::Status ArrowFlightBatchRemoteReader::init_schema() {
return arrow::Status::OK();
}
-arrow::Status
ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>*
out) {
+arrow::Status
ArrowFlightBatchRemoteReader::ReadNextImpl(std::shared_ptr<arrow::RecordBatch>*
out) {
// parameter *out not nullptr
*out = nullptr;
SCOPED_ATTACH_TASK(_mem_tracker);
@@ -310,4 +314,8 @@ arrow::Status
ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::Reco
return arrow::Status::OK();
}
+arrow::Status
ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>*
out) {
+ RETURN_ARROW_STATUS_IF_CATCH_EXCEPTION(ReadNextImpl(out));
+}
+
} // namespace doris::flight
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.h
b/be/src/service/arrow_flight/arrow_flight_batch_reader.h
index 612ebc8063c..a1b25511b8c 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.h
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.h
@@ -77,6 +77,8 @@ private:
ArrowFlightBatchLocalReader(const std::shared_ptr<QueryStatement>&
statement,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<MemTrackerLimiter>&
mem_tracker);
+
+ arrow::Status ReadNextImpl(std::shared_ptr<arrow::RecordBatch>* out);
};
class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase {
@@ -94,6 +96,7 @@ private:
ArrowFlightBatchRemoteReader(const std::shared_ptr<QueryStatement>&
statement,
const std::shared_ptr<PBackendService_Stub>&
stub);
+ arrow::Status ReadNextImpl(std::shared_ptr<arrow::RecordBatch>* out);
arrow::Status _fetch_schema();
arrow::Status _fetch_data();
diff --git a/be/src/util/arrow/block_convertor.cpp
b/be/src/util/arrow/block_convertor.cpp
index 17dd294b9e2..b13392360b2 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -97,17 +97,21 @@ Status
FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
_cur_rows = _block.rows();
_cur_col = _block.get_by_position(idx).column;
_cur_type = _block.get_by_position(idx).type;
+ auto column = _cur_col->convert_to_full_column_if_const();
+ auto arrow_type = _schema->field(idx)->type();
+ if (arrow_type->name() == "utf8" && column->byte_size() >=
MAX_ARROW_UTF8) {
+ arrow_type = arrow::large_utf8();
+ }
std::unique_ptr<arrow::ArrayBuilder> builder;
- auto arrow_st = arrow::MakeBuilder(_pool, _schema->field(idx)->type(),
&builder);
+ auto arrow_st = arrow::MakeBuilder(_pool, arrow_type, &builder);
if (!arrow_st.ok()) {
return to_doris_status(arrow_st);
}
_cur_builder = builder.get();
- auto column = _cur_col->convert_to_full_column_if_const();
try {
- _cur_type->get_serde()->write_column_to_arrow(*column, nullptr,
_cur_builder,
- _cur_start,
_cur_start + _cur_rows,
- _timezone_obj);
+ RETURN_IF_ERROR(_cur_type->get_serde()->write_column_to_arrow(
+ *column, nullptr, _cur_builder, _cur_start, _cur_start +
_cur_rows,
+ _timezone_obj));
} catch (std::exception& e) {
return Status::InternalError(
"Fail to convert block data to arrow data, type: {}, name:
{}, error: {}",
diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h
index db7b70232d2..6dafbe8cab6 100644
--- a/be/src/util/arrow/row_batch.h
+++ b/be/src/util/arrow/row_batch.h
@@ -39,6 +39,8 @@ class Schema;
namespace doris {
+constexpr size_t MAX_ARROW_UTF8 = (1ULL << 21); // 2G
+
class RowDescriptor;
Status convert_to_arrow_type(const vectorized::DataTypePtr& type,
diff --git a/be/src/util/arrow/utils.h b/be/src/util/arrow/utils.h
index c992dabee57..0a731bafbd5 100644
--- a/be/src/util/arrow/utils.h
+++ b/be/src/util/arrow/utils.h
@@ -55,6 +55,16 @@ namespace doris {
} \
} while (false)
+#define RETURN_ARROW_STATUS_IF_CATCH_EXCEPTION(stmt)
\
+ do {
\
+ try {
\
+ arrow::Status _status_ = (stmt);
\
+ return _status_;
\
+ } catch (const doris::Exception& e) {
\
+ return to_arrow_status(Status::Error<false>(e.code(),
e.to_string())); \
+ }
\
+ } while (0)
+
// Pretty print a arrow RecordBatch.
Status arrow_pretty_print(const arrow::RecordBatch& rb, std::ostream* os);
Status arrow_pretty_print(const arrow::Array& rb, std::ostream* os);
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp
b/be/src/vec/data_types/serde/data_type_array_serde.cpp
index ea748cdd6c4..dc90a67a74c 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp
@@ -271,9 +271,9 @@ void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn&
column, const JsonbVa
column.deserialize_and_insert_from_arena(blob->getBlob());
}
-void DataTypeArraySerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeArraySerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
const auto& array_column = static_cast<const ColumnArray&>(column);
const auto& offsets = array_column.get_offsets();
const auto& nested_data = array_column.get_data();
@@ -281,19 +281,22 @@ void DataTypeArraySerDe::write_column_to_arrow(const
IColumn& column, const Null
auto* nested_builder = builder.value_builder();
for (size_t array_idx = start; array_idx < end; ++array_idx) {
if (null_map && (*null_map)[array_idx]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
continue;
}
- checkArrowStatus(builder.Append(), column.get_name(),
array_builder->type()->name());
- nested_serde->write_column_to_arrow(nested_data, nullptr,
nested_builder,
- offsets[array_idx - 1],
offsets[array_idx], ctz);
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(), column.get_name(),
+ array_builder->type()->name()));
+ RETURN_IF_ERROR(nested_serde->write_column_to_arrow(nested_data,
nullptr, nested_builder,
+ offsets[array_idx
- 1],
+
offsets[array_idx], ctz));
}
+ return Status::OK();
}
-void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int64_t start, int64_t end,
+ const cctz::time_zone& ctz)
const {
auto& column_array = static_cast<ColumnArray&>(column);
auto& offsets_data = column_array.get_offsets();
const auto* concrete_array = dynamic_cast<const
arrow::ListArray*>(arrow_array);
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h
b/be/src/vec/data_types/serde/data_type_array_serde.h
index f9d852b3843..8378c79cae2 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_array_serde.h
@@ -78,11 +78,11 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int64_t row_idx, bool col_const,
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
index 2e4f8d72c6f..a506573531a 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
@@ -121,24 +121,25 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const
IColumn& column, JsonbWr
result.writeEndBinary();
}
-void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
const auto& col = assert_cast<const ColumnBitmap&>(column);
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
} else {
auto& bitmap_value =
const_cast<BitmapValue&>(col.get_element(string_i));
std::string memory_buffer(bitmap_value.getSizeInBytes(), '0');
bitmap_value.write_to(memory_buffer.data());
- checkArrowStatus(
+ RETURN_IF_ERROR(checkArrowStatus(
builder.Append(memory_buffer.data(),
static_cast<int>(memory_buffer.size())),
- column.get_name(), array_builder->type()->name());
+ column.get_name(), array_builder->type()->name()));
}
}
+ return Status::OK();
}
void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const
JsonbValue* arg) const {
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
index 22c450cd27e..ab99157319a 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
@@ -57,14 +57,14 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
-
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "read_column_from_arrow with type " +
column.get_name());
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override {
+ return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+ "read_column_from_arrow with type " +
column.get_name());
}
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
index 9e3e12b0313..03a10b554c7 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -169,17 +169,18 @@ Status
DataTypeDateTimeSerDe::deserialize_one_cell_from_json(IColumn& column, Sl
return Status::OK();
}
-void DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
- _read_column_from_arrow<false>(column, arrow_array, start, end, ctz);
+Status DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array, int64_t start,
+ int64_t end,
+ const cctz::time_zone&
ctz) const {
+ return _read_column_from_arrow<false>(column, arrow_array, start, end,
ctz);
}
template <PrimitiveType T>
-void DataTypeDate64SerDe<T>::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
- arrow::ArrayBuilder*
array_builder,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeDate64SerDe<T>::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
auto& col_data = static_cast<const
ColumnVector<Int64>&>(column).get_data();
auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
for (size_t i = start; i < end; ++i) {
@@ -187,13 +188,14 @@ void DataTypeDate64SerDe<T>::write_column_to_arrow(const
IColumn& column, const
const VecDateTimeValue* time_val = (const
VecDateTimeValue*)(&col_data[i]);
size_t len = time_val->to_buffer(buf);
if (null_map && (*null_map)[i]) {
- checkArrowStatus(string_builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(string_builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
} else {
- checkArrowStatus(string_builder.Append(buf,
cast_set<int32_t>(len)), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(string_builder.Append(buf,
cast_set<int32_t>(len)),
+ column.get_name(),
array_builder->type()->name()));
}
}
+ return Status::OK();
}
static int64_t time_unit_divisor(arrow::TimeUnit::type unit) {
@@ -218,10 +220,10 @@ static int64_t time_unit_divisor(arrow::TimeUnit::type
unit) {
template <PrimitiveType T>
template <bool is_date>
-void DataTypeDate64SerDe<T>::_read_column_from_arrow(IColumn& column,
- const arrow::Array*
arrow_array, int64_t start,
- int64_t end,
- const cctz::time_zone&
ctz) const {
+Status DataTypeDate64SerDe<T>::_read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
int64_t divisor = 1;
int64_t multiplier = 1;
@@ -271,16 +273,18 @@ void
DataTypeDate64SerDe<T>::_read_column_from_arrow(IColumn& column,
col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
}
} else {
- throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
- "Unsupported Arrow Type: " +
arrow_array->type()->name());
+ return Status::Error(doris::ErrorCode::INVALID_ARGUMENT,
+ "Unsupported Arrow Type: " +
arrow_array->type()->name());
}
+ return Status::OK();
}
template <PrimitiveType T>
-void DataTypeDate64SerDe<T>::read_column_from_arrow(IColumn& column,
- const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
- _read_column_from_arrow<true>(column, arrow_array, start, end, ctz);
+Status DataTypeDate64SerDe<T>::read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
+ return _read_column_from_arrow<true>(column, arrow_array, start, end, ctz);
}
template <PrimitiveType T>
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h
b/be/src/vec/data_types/serde/data_type_date64_serde.h
index caea5a3b551..944ef7bc977 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.h
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -60,11 +60,11 @@ public:
IColumn& column, std::vector<Slice>& slices, uint64_t*
num_deserialized,
const typename DataTypeNumberSerDe<T>::FormatOptions& options)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(
const IColumn& column, MysqlRowBuffer<true>& row_buffer, int64_t
row_idx,
bool col_const,
@@ -81,8 +81,8 @@ public:
protected:
template <bool is_date>
- void _read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz)
const;
+ Status _read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const;
private:
template <bool is_binary_format>
@@ -107,8 +107,8 @@ public:
Status deserialize_column_from_json_vector(IColumn& column,
std::vector<Slice>& slices,
uint64_t* num_deserialized,
const FormatOptions& options)
const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index 7dcd2bba6ff..3cfa17244ad 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -108,16 +108,17 @@ Status
DataTypeDateTimeV2SerDe::deserialize_one_cell_from_json(IColumn& column,
return Status::OK();
}
-void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
- arrow::ArrayBuilder*
array_builder,
- int64_t start, int64_t end,
- const cctz::time_zone&
ctz) const {
+Status DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column,
+ const NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
const auto& col_data = static_cast<const
ColumnVector<UInt64>&>(column).get_data();
auto& timestamp_builder =
assert_cast<arrow::TimestampBuilder&>(*array_builder);
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
- checkArrowStatus(timestamp_builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(timestamp_builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
} else {
int64_t timestamp = 0;
DateV2Value<DateTimeV2ValueType> datetime_val =
@@ -131,16 +132,17 @@ void DataTypeDateTimeV2SerDe::write_column_to_arrow(const
IColumn& column, const
uint32_t millisecond = datetime_val.microsecond() / 1000;
timestamp = (timestamp * 1000) + millisecond;
}
- checkArrowStatus(timestamp_builder.Append(timestamp),
column.get_name(),
- array_builder->type()->name());
+
RETURN_IF_ERROR(checkArrowStatus(timestamp_builder.Append(timestamp),
column.get_name(),
+ array_builder->type()->name()));
}
}
+ return Status::OK();
}
-void DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
- const arrow::Array*
arrow_array, int64_t start,
- int64_t end,
- const cctz::time_zone&
ctz) const {
+Status DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
auto& col_data = static_cast<ColumnDateTimeV2&>(column).get_data();
int64_t divisor = 1;
if (arrow_array->type()->id() == arrow::Type::TIMESTAMP) {
@@ -165,7 +167,8 @@ void
DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
}
default: {
LOG(WARNING) << "not support convert to datetimev2 from
time_unit:" << type->unit();
- return;
+ return Status::InvalidArgument("not support convert to datetimev2
from time_unit: {}",
+ type->unit());
}
}
for (auto value_i = start; value_i < end; ++value_i) {
@@ -184,7 +187,10 @@ void
DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
} else {
LOG(WARNING) << "not support convert to datetimev2 from arrow type:"
<< arrow_array->type()->id();
+ return Status::InternalError("not support convert to datetimev2 from
arrow type: {}",
+ arrow_array->type()->id());
}
+ return Status::OK();
}
template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
index 48154d0cae5..899e6e1b1e7 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
@@ -59,11 +59,11 @@ public:
uint64_t* num_deserialized,
const FormatOptions& options)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int64_t row_idx, bool col_const,
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
index f1bcbdc266c..1c5109c9d27 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -91,27 +91,29 @@ Status
DataTypeDateV2SerDe::deserialize_one_cell_from_json(IColumn& column, Slic
return Status::OK();
}
-void DataTypeDateV2SerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeDateV2SerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
const auto& col_data = static_cast<const
ColumnVector<UInt32>&>(column).get_data();
auto& date32_builder = assert_cast<arrow::Date32Builder&>(*array_builder);
for (size_t i = start; i < end; ++i) {
auto daynr = binary_cast<UInt32,
DateV2Value<DateV2ValueType>>(col_data[i]).daynr() -
date_threshold;
if (null_map && (*null_map)[i]) {
- checkArrowStatus(date32_builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(date32_builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
} else {
- checkArrowStatus(date32_builder.Append(cast_set<int, int64_t,
false>(daynr)),
- column.get_name(), array_builder->type()->name());
+ RETURN_IF_ERROR(
+ checkArrowStatus(date32_builder.Append(cast_set<int,
int64_t, false>(daynr)),
+ column.get_name(),
array_builder->type()->name()));
}
}
+ return Status::OK();
}
-void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int64_t start, int64_t end,
+ const cctz::time_zone& ctz)
const {
auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
const auto* concrete_array = dynamic_cast<const
arrow::Date32Array*>(arrow_array);
for (auto value_i = start; value_i < end; ++value_i) {
@@ -119,6 +121,7 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn&
column, const arrow::A
v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold);
col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>,
UInt32>(v));
}
+ return Status::OK();
}
template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.h
b/be/src/vec/data_types/serde/data_type_datev2_serde.h
index 9d2d66384be..58f441e05f4 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.h
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.h
@@ -58,11 +58,11 @@ public:
uint64_t* num_deserialized,
const FormatOptions& options)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index 891491b09f1..0569438cdf6 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -87,10 +87,11 @@ Status
DataTypeDecimalSerDe<T>::deserialize_one_cell_from_json(IColumn& column,
}
template <typename T>
-void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
- arrow::ArrayBuilder*
array_builder,
- int64_t start, int64_t end,
- const cctz::time_zone&
ctz) const {
+Status DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column,
+ const NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column);
if constexpr (std::is_same_v<T, Decimal<Int128>>) {
auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
@@ -98,8 +99,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
std::make_shared<arrow::Decimal128Type>(27, 9);
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+
array_builder->type()->name()));
continue;
}
const auto& data_ref = col.get_data_at(i);
@@ -107,8 +108,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
int64_t high = (p_value->value) >> 64;
uint64 low = cast_set<uint64>((p_value->value) &
0xFFFFFFFFFFFFFFFF);
arrow::Decimal128 value(high, low);
- checkArrowStatus(builder.Append(value), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(value),
column.get_name(),
+ array_builder->type()->name()));
}
} else if constexpr (std::is_same_v<T, Decimal128V3>) {
auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
@@ -116,8 +117,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+
array_builder->type()->name()));
continue;
}
const auto& data_ref = col.get_data_at(i);
@@ -125,8 +126,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
int64_t high = (p_value->value) >> 64;
uint64 low = cast_set<uint64>((p_value->value) &
0xFFFFFFFFFFFFFFFF);
arrow::Decimal128 value(high, low);
- checkArrowStatus(builder.Append(value), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(value),
column.get_name(),
+ array_builder->type()->name()));
}
} else if constexpr (std::is_same_v<T, Decimal<Int32>>) {
auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
@@ -134,14 +135,14 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
std::make_shared<arrow::Decimal128Type>(8, col.get_scale());
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+
array_builder->type()->name()));
continue;
}
Int128 p_value = Int128(col.get_element(i));
arrow::Decimal128 value(reinterpret_cast<const
uint8_t*>(&p_value));
- checkArrowStatus(builder.Append(value), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(value),
column.get_name(),
+ array_builder->type()->name()));
}
} else if constexpr (std::is_same_v<T, Decimal<Int64>>) {
auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
@@ -149,14 +150,14 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
std::make_shared<arrow::Decimal128Type>(18, col.get_scale());
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+
array_builder->type()->name()));
continue;
}
Int128 p_value = Int128(col.get_element(i));
arrow::Decimal128 value(reinterpret_cast<const
uint8_t*>(&p_value));
- checkArrowStatus(builder.Append(value), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(value),
column.get_name(),
+ array_builder->type()->name()));
}
} else if constexpr (std::is_same_v<T, Decimal256>) {
auto& builder =
reinterpret_cast<arrow::Decimal256Builder&>(*array_builder);
@@ -164,8 +165,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
std::make_shared<arrow::Decimal256Type>(76, col.get_scale());
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+
array_builder->type()->name()));
continue;
}
auto p_value = wide::Int256(col.get_element(i));
@@ -177,20 +178,20 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
std::array<uint64_t, 4> word_array = {a0, a1, a2, a3};
arrow::Decimal256 value(arrow::Decimal256::LittleEndianArray,
word_array);
- checkArrowStatus(builder.Append(value), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(value),
column.get_name(),
+ array_builder->type()->name()));
}
} else {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "write_column_to_arrow with type " +
column.get_name());
+ return Status::InvalidArgument("write_column_to_arrow with type " +
column.get_name());
}
+ return Status::OK();
}
template <typename T>
-void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
- const arrow::Array*
arrow_array, int64_t start,
- int64_t end,
- const cctz::time_zone&
ctz) const {
+Status DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
auto& column_data = static_cast<ColumnDecimal<T>&>(column).get_data();
// Decimal<Int128> for decimalv2
// Decimal<Int128I> for deicmalv3
@@ -234,9 +235,10 @@ void
DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
column_data.emplace_back(*reinterpret_cast<const
T*>(concrete_array->Value(value_i)));
}
} else {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "read_column_from_arrow with type " +
column.get_name());
+ return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+ "read_column_from_arrow with type " +
column.get_name());
}
+ return Status::OK();
}
template <typename T>
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h
b/be/src/vec/data_types/serde/data_type_decimal_serde.h
index 2ade0705b9c..018a8791264 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.h
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h
@@ -100,11 +100,11 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
index 09b65597dbc..f558e2e827d 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
@@ -125,24 +125,25 @@ void DataTypeHLLSerDe::read_one_cell_from_jsonb(IColumn&
column, const JsonbValu
col.insert_value(hyper_log_log);
}
-void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
const auto& col = assert_cast<const ColumnHLL&>(column);
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
} else {
auto& hll_value =
const_cast<HyperLogLog&>(col.get_element(string_i));
std::string memory_buffer(hll_value.max_serialized_size(), '0');
hll_value.serialize((uint8_t*)memory_buffer.data());
- checkArrowStatus(
+ RETURN_IF_ERROR(checkArrowStatus(
builder.Append(memory_buffer.data(),
static_cast<int>(memory_buffer.size())),
- column.get_name(), array_builder->type()->name());
+ column.get_name(), array_builder->type()->name()));
}
}
+ return Status::OK();
}
template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h
b/be/src/vec/data_types/serde/data_type_hll_serde.h
index b096ac49d30..18c669d0be0 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.h
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.h
@@ -52,13 +52,13 @@ public:
int32_t col_id, int64_t row_num) const
override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "read_column_from_arrow with type " +
column.get_name());
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override {
+ return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+ "read_column_from_arrow with type " +
column.get_name());
}
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp
b/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp
index 904a1401a0d..0b00a10062c 100644
--- a/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp
@@ -123,28 +123,31 @@ Status DataTypeIPv4SerDe::read_column_from_pb(IColumn&
column, const PValues& ar
return Status::OK();
}
-void DataTypeIPv4SerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeIPv4SerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
const auto& col_data = assert_cast<const ColumnIPv4&>(column).get_data();
auto& int32_builder = assert_cast<arrow::Int32Builder&>(*array_builder);
auto arrow_null_map = revert_null_map(null_map, start, end);
auto* arrow_null_map_data = arrow_null_map.empty() ? nullptr :
arrow_null_map.data();
- checkArrowStatus(int32_builder.AppendValues(
- reinterpret_cast<const Int32*>(col_data.data()) +
start, end - start,
- reinterpret_cast<const
uint8_t*>(arrow_null_map_data)),
- column.get_name(), array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(
+ int32_builder.AppendValues(reinterpret_cast<const
Int32*>(col_data.data()) + start,
+ end - start,
+ reinterpret_cast<const
uint8_t*>(arrow_null_map_data)),
+ column.get_name(), array_builder->type()->name()));
+ return Status::OK();
}
-void DataTypeIPv4SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeIPv4SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int64_t start, int64_t end,
+ const cctz::time_zone& ctz)
const {
auto& col_data = assert_cast<ColumnIPv4&>(column).get_data();
int64_t row_count = end - start;
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = arrow_array->data()->buffers[1];
const auto* raw_data = reinterpret_cast<const UInt32*>(buffer->data()) +
start;
col_data.insert(raw_data, raw_data + row_count);
+ return Status::OK();
}
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_ipv4_serde.h
b/be/src/vec/data_types/serde/data_type_ipv4_serde.h
index 58c015fee41..b52b7e243fc 100644
--- a/be/src/vec/data_types/serde/data_type_ipv4_serde.h
+++ b/be/src/vec/data_types/serde/data_type_ipv4_serde.h
@@ -55,11 +55,11 @@ public:
Status write_column_to_pb(const IColumn& column, PValues& result, int64_t
start,
int64_t end) const override;
Status read_column_from_pb(IColumn& column, const PValues& arg) const
override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
private:
template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
index 437c58cd2c9..d1baf9687ca 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
@@ -146,27 +146,29 @@ Status DataTypeIPv6SerDe::read_column_from_pb(IColumn&
column, const PValues& ar
return Status::OK();
}
-void DataTypeIPv6SerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeIPv6SerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
const auto& col_data = assert_cast<const ColumnIPv6&>(column).get_data();
auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
- checkArrowStatus(string_builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(string_builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
} else {
std::string ipv6_str = IPv6Value::to_string(col_data[i]);
- checkArrowStatus(string_builder.Append(ipv6_str.c_str(),
- cast_set<int, size_t,
false>(ipv6_str.size())),
- column.get_name(), array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(
+ string_builder.Append(ipv6_str.c_str(),
+ cast_set<int, size_t,
false>(ipv6_str.size())),
+ column.get_name(), array_builder->type()->name()));
}
}
+ return Status::OK();
}
-void DataTypeIPv6SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeIPv6SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int64_t start, int64_t end,
+ const cctz::time_zone& ctz)
const {
auto& col_data = assert_cast<ColumnIPv6&>(column).get_data();
const auto* concrete_array = assert_cast<const
arrow::StringArray*>(arrow_array);
std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
@@ -182,9 +184,9 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn&
column, const arrow::Arr
} else {
IPv6 ipv6_val;
if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len))
{
- throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
- "parse number fail, string: '{}'",
- std::string(raw_data,
raw_data_len).c_str());
+ return Status::Error(ErrorCode::INVALID_ARGUMENT,
+ "parse number fail, string: '{}'",
+ std::string(raw_data,
raw_data_len).c_str());
}
col_data.emplace_back(ipv6_val);
}
@@ -192,6 +194,7 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn&
column, const arrow::Arr
col_data.emplace_back(0);
}
}
+ return Status::OK();
}
Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone,
const IColumn& column,
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.h
b/be/src/vec/data_types/serde/data_type_ipv6_serde.h
index 67e0dc9beac..24cd1e97f34 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.h
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.h
@@ -63,11 +63,11 @@ public:
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch,
int64_t start, int64_t end,
std::vector<StringRef>& buffer_list) const
override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
void write_one_cell_to_jsonb(const IColumn& column,
JsonbWriterT<JsonbOutStream>& result,
Arena* mem_pool, int unique_id, int64_t
row_num) const override;
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
index 79df9656a9f..c3258e2c914 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
@@ -113,24 +113,26 @@ Status
DataTypeJsonbSerDe::deserialize_one_cell_from_json(IColumn& column, Slice
return Status::OK();
}
-void DataTypeJsonbSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeJsonbSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
const auto& string_column = assert_cast<const ColumnString&>(column);
auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
continue;
}
std::string_view string_ref =
string_column.get_data_at(string_i).to_string_view();
std::string json_string =
JsonbToJson::jsonb_to_json_string(string_ref.data(),
string_ref.size());
- checkArrowStatus(builder.Append(json_string.data(),
- cast_set<int, size_t,
false>(json_string.size())),
- column.get_name(), array_builder->type()->name());
+ RETURN_IF_ERROR(
+ checkArrowStatus(builder.Append(json_string.data(),
+ cast_set<int, size_t,
false>(json_string.size())),
+ column.get_name(),
array_builder->type()->name()));
}
+ return Status::OK();
}
Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone,
const IColumn& column,
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.h
b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
index f1fc1634b5c..c7acb045be6 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.h
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
@@ -43,9 +43,9 @@ public:
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>&
row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num,
BufferWritable& bw,
FormatOptions& options) const override;
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp
b/be/src/vec/data_types/serde/data_type_map_serde.cpp
index d2c311b70d9..c4efda69349 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp
@@ -330,9 +330,9 @@ void DataTypeMapSerDe::write_one_cell_to_jsonb(const
IColumn& column, JsonbWrite
result.writeEndBinary();
}
-void DataTypeMapSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeMapSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
auto& builder = assert_cast<arrow::MapBuilder&>(*array_builder);
const auto& map_column = assert_cast<const ColumnMap&>(column);
const IColumn& nested_keys_column = map_column.get_keys();
@@ -348,8 +348,8 @@ void DataTypeMapSerDe::write_column_to_arrow(const IColumn&
column, const NullMa
for (size_t r = start; r < end; ++r) {
if ((null_map && (*null_map)[r])) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
} else if (simd::contain_byte(keys_nullmap_data + offsets[r - 1],
offsets[r] - offsets[r - 1], 1)) {
// arrow do not support key is null, so we ignore the null
key-value
@@ -357,31 +357,35 @@ void DataTypeMapSerDe::write_column_to_arrow(const
IColumn& column, const NullMa
MutableColumnPtr value_mutable_data =
nested_values_column.clone_empty();
for (size_t i = offsets[r - 1]; i < offsets[r]; ++i) {
if (keys_nullmap_data[i] == 1) {
- throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
- "Can not write null value of map
key to arrow.");
+ return Status::Error(ErrorCode::INVALID_ARGUMENT,
+ "Can not write null value of map key
to arrow.");
}
key_mutable_data->insert_from(nested_keys_column, i);
value_mutable_data->insert_from(nested_values_column, i);
}
- checkArrowStatus(builder.Append(), column.get_name(),
array_builder->type()->name());
-
- key_serde->write_column_to_arrow(*key_mutable_data, nullptr,
key_builder, 0,
- key_mutable_data->size(), ctz);
- value_serde->write_column_to_arrow(*value_mutable_data, nullptr,
value_builder, 0,
- value_mutable_data->size(),
ctz);
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(),
column.get_name(),
+ array_builder->type()->name()));
+
+ RETURN_IF_ERROR(key_serde->write_column_to_arrow(
+ *key_mutable_data, nullptr, key_builder, 0,
key_mutable_data->size(), ctz));
+
RETURN_IF_ERROR(value_serde->write_column_to_arrow(*value_mutable_data, nullptr,
+ value_builder,
0,
+
value_mutable_data->size(), ctz));
} else {
- checkArrowStatus(builder.Append(), column.get_name(),
array_builder->type()->name());
- key_serde->write_column_to_arrow(nested_keys_column, nullptr,
key_builder,
- offsets[r - 1], offsets[r], ctz);
- value_serde->write_column_to_arrow(nested_values_column, nullptr,
value_builder,
- offsets[r - 1], offsets[r],
ctz);
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(),
column.get_name(),
+ array_builder->type()->name()));
+ RETURN_IF_ERROR(key_serde->write_column_to_arrow(
+ nested_keys_column, nullptr, key_builder, offsets[r - 1],
offsets[r], ctz));
+ RETURN_IF_ERROR(value_serde->write_column_to_arrow(
+ nested_values_column, nullptr, value_builder, offsets[r -
1], offsets[r], ctz));
}
}
+ return Status::OK();
}
-void DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int64_t start, int64_t end,
+ const cctz::time_zone& ctz)
const {
auto& column_map = static_cast<ColumnMap&>(column);
auto& offsets_data = column_map.get_offsets();
const auto* concrete_map = dynamic_cast<const
arrow::MapArray*>(arrow_array);
@@ -394,10 +398,13 @@ void DataTypeMapSerDe::read_column_from_arrow(IColumn&
column, const arrow::Arra
// convert to doris offset, start from offsets.back()
offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) -
arrow_nested_start_offset);
}
- key_serde->read_column_from_arrow(column_map.get_keys(),
concrete_map->keys().get(),
- arrow_nested_start_offset,
arrow_nested_end_offset, ctz);
- value_serde->read_column_from_arrow(column_map.get_values(),
concrete_map->items().get(),
- arrow_nested_start_offset,
arrow_nested_end_offset, ctz);
+ RETURN_IF_ERROR(key_serde->read_column_from_arrow(
+ column_map.get_keys(), concrete_map->keys().get(),
arrow_nested_start_offset,
+ arrow_nested_end_offset, ctz));
+ RETURN_IF_ERROR(value_serde->read_column_from_arrow(
+ column_map.get_values(), concrete_map->items().get(),
arrow_nested_start_offset,
+ arrow_nested_end_offset, ctz));
+ return Status::OK();
}
template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h
b/be/src/vec/data_types/serde/data_type_map_serde.h
index 7472d599377..6a129a46e5c 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.h
+++ b/be/src/vec/data_types/serde/data_type_map_serde.h
@@ -72,11 +72,11 @@ public:
int32_t col_id, int64_t row_num) const
override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int64_t row_idx, bool col_const,
diff --git a/be/src/vec/data_types/serde/data_type_nothing_serde.h
b/be/src/vec/data_types/serde/data_type_nothing_serde.h
index f31faa5c1e6..e31df4de419 100644
--- a/be/src/vec/data_types/serde/data_type_nothing_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nothing_serde.h
@@ -90,16 +90,16 @@ public:
"read_one_cell_from_jsonb with type " +
column.get_name());
}
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "write_column_to_arrow with type " +
column.get_name());
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override {
+ return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+ "write_column_to_arrow with type " +
column.get_name());
}
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "read_column_from_arrow with type " +
column.get_name());
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override {
+ return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+ "read_column_from_arrow with type " +
column.get_name());
}
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index b8151dc650c..55fc09e513b 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -288,18 +288,20 @@ void
DataTypeNullableSerDe::read_one_cell_from_jsonb(IColumn& column, const Json
1/ convert the null_map from doris to arrow null byte map
2/ pass the arrow null byteamp to nested column , and call AppendValues
**/
-void DataTypeNullableSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeNullableSerDe::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder,
+ int64_t start, int64_t end,
+ const cctz::time_zone&
ctz) const {
const auto& column_nullable = assert_cast<const ColumnNullable&>(column);
- nested_serde->write_column_to_arrow(column_nullable.get_nested_column(),
- &column_nullable.get_null_map_data(),
array_builder, start,
- end, ctz);
+ return
nested_serde->write_column_to_arrow(column_nullable.get_nested_column(),
+
&column_nullable.get_null_map_data(), array_builder,
+ start, end, ctz);
}
-void DataTypeNullableSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeNullableSerDe::read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array, int64_t start,
+ int64_t end,
+ const cctz::time_zone&
ctz) const {
auto& col = reinterpret_cast<ColumnNullable&>(column);
NullMap& map_data = col.get_null_map_data();
for (auto i = start; i < end; ++i) {
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h
b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 8ed0bb6826d..9f979120cea 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -72,11 +72,11 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index 5888afab824..17b703f63ab 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -69,10 +69,10 @@ using DORIS_NUMERIC_ARROW_BUILDER =
>;
template <PrimitiveType T>
-void DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
- arrow::ArrayBuilder*
array_builder,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
auto& col_data = assert_cast<const ColumnType&>(column).get_data();
using ARROW_BUILDER_TYPE =
typename TypeMapLookup<typename
PrimitiveTypeTraits<T>::ColumnItemType,
@@ -83,16 +83,16 @@ void DataTypeNumberSerDe<T>::write_column_to_arrow(const
IColumn& column, const
auto* null_builder = dynamic_cast<arrow::NullBuilder*>(array_builder);
if (null_builder) {
for (size_t i = start; i < end; ++i) {
- checkArrowStatus(null_builder->AppendNull(), column.get_name(),
- null_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(null_builder->AppendNull(),
column.get_name(),
+
null_builder->type()->name()));
}
} else {
auto& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
- checkArrowStatus(
+ RETURN_IF_ERROR(checkArrowStatus(
builder.AppendValues(reinterpret_cast<const
uint8_t*>(col_data.data() + start),
end - start,
reinterpret_cast<const
uint8_t*>(arrow_null_map_data)),
- column.get_name(), array_builder->type()->name());
+ column.get_name(), array_builder->type()->name()));
}
} else if constexpr (T == TYPE_LARGEINT) {
@@ -101,23 +101,24 @@ void DataTypeNumberSerDe<T>::write_column_to_arrow(const
IColumn& column, const
auto& data_value = col_data[i];
std::string value_str = fmt::format("{}", data_value);
if (null_map && (*null_map)[i]) {
- checkArrowStatus(string_builder.AppendNull(),
column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(string_builder.AppendNull(),
column.get_name(),
+
array_builder->type()->name()));
} else {
- checkArrowStatus(
+ RETURN_IF_ERROR(checkArrowStatus(
string_builder.Append(value_str.data(),
cast_set<int, size_t,
false>(value_str.length())),
- column.get_name(), array_builder->type()->name());
+ column.get_name(), array_builder->type()->name()));
}
}
} else if constexpr (T == TYPE_IPV6) {
} else {
auto& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
- checkArrowStatus(
+ RETURN_IF_ERROR(checkArrowStatus(
builder.AppendValues(col_data.data() + start, end - start,
reinterpret_cast<const
uint8_t*>(arrow_null_map_data)),
- column.get_name(), array_builder->type()->name());
+ column.get_name(), array_builder->type()->name()));
}
+ return Status::OK();
}
template <PrimitiveType T>
@@ -193,9 +194,10 @@ Status
DataTypeNumberSerDe<T>::deserialize_column_from_json_vector(
}
template <PrimitiveType T>
-void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
- const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array,
+ int64_t start, int64_t
end,
+ const cctz::time_zone&
ctz) const {
auto row_count = end - start;
auto& col_data = static_cast<ColumnType&>(column).get_data();
@@ -205,7 +207,7 @@ void
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
for (size_t bool_i = 0; bool_i !=
static_cast<size_t>(concrete_array->length()); ++bool_i) {
col_data.emplace_back(concrete_array->Value(bool_i));
}
- return;
+ return Status::OK();
}
// only for largeint(int128) type
@@ -224,9 +226,9 @@ void
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
Int128 val = 0;
ReadBuffer rb(raw_data, raw_data_len);
if (!read_int_text_impl(val, rb)) {
- throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
- "parse number fail, string:
'{}'",
- std::string(rb.position(),
rb.count()).c_str());
+ return Status::Error(ErrorCode::INVALID_ARGUMENT,
+ "parse number fail, string: '{}'",
+ std::string(rb.position(),
rb.count()).c_str());
}
col_data.emplace_back(val);
}
@@ -234,7 +236,7 @@ void
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
col_data.emplace_back(Int128()); // Int128() is NULL
}
}
- return;
+ return Status::OK();
}
/// buffers[0] is a null bitmap and buffers[1] are actual values
@@ -243,6 +245,7 @@ void
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
buffer->data()) +
start;
col_data.insert(raw_data, raw_data + row_count);
+ return Status::OK();
}
template <PrimitiveType T>
Status DataTypeNumberSerDe<T>::deserialize_column_from_fixed_json(
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h
b/be/src/vec/data_types/serde/data_type_number_serde.h
index b69629950d8..ffb5cd8eb21 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.h
+++ b/be/src/vec/data_types/serde/data_type_number_serde.h
@@ -86,11 +86,11 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int64_t row_idx, bool col_const,
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index d1e0084ecd8..42acec9bd3d 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -31,6 +31,7 @@
#include "vec/common/schema_util.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
+#include "vec/data_types/serde/data_type_serde.h"
namespace doris {
@@ -144,26 +145,29 @@ Status
DataTypeVariantSerDe::serialize_one_cell_to_json(const IColumn& column, i
return Status::OK();
}
-void DataTypeVariantSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeVariantSerDe::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder,
+ int64_t start, int64_t end,
+ const cctz::time_zone& ctz)
const {
const auto* var = check_and_get_column<ColumnVariant>(column);
auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+ array_builder->type()->name()));
} else {
std::string serialized_value;
if (!var->serialize_one_row_to_string(i, &serialized_value)) {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to
serialize variant {}",
- var->dump_structure());
+ return Status::Error(ErrorCode::INTERNAL_ERROR, "Failed to
serialize variant {}",
+ var->dump_structure());
}
- checkArrowStatus(builder.Append(serialized_value.data(),
-
static_cast<int>(serialized_value.size())),
- column.get_name(), array_builder->type()->name());
+ RETURN_IF_ERROR(
+ checkArrowStatus(builder.Append(serialized_value.data(),
+
static_cast<int>(serialized_value.size())),
+ column.get_name(),
array_builder->type()->name()));
}
}
+ return Status::OK();
}
Status DataTypeVariantSerDe::write_column_to_orc(const std::string& timezone,
const IColumn& column,
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h
b/be/src/vec/data_types/serde/data_type_object_serde.h
index 384bdf6727c..3c7a27ff48f 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -68,13 +68,13 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "read_column_from_arrow with type " +
column.get_name());
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override {
+ return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+ "read_column_from_arrow with type " +
column.get_name());
}
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
index 8920b822d5f..adcbc2a5a5e 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
@@ -114,29 +114,31 @@ public:
col.insert_value(val);
}
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override {
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override {
const auto& col = assert_cast<const ColumnQuantileState&>(column);
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+
array_builder->type()->name()));
} else {
auto& quantile_state_value =
const_cast<QuantileState&>(col.get_element(string_i));
std::string
memory_buffer(quantile_state_value.get_serialized_size(), '0');
quantile_state_value.serialize((uint8_t*)memory_buffer.data());
- checkArrowStatus(builder.Append(memory_buffer.data(),
-
static_cast<int>(memory_buffer.size())),
- column.get_name(),
array_builder->type()->name());
+ RETURN_IF_ERROR(
+ checkArrowStatus(builder.Append(memory_buffer.data(),
+
static_cast<int>(memory_buffer.size())),
+ column.get_name(),
array_builder->type()->name()));
}
}
+ return Status::OK();
}
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
- "read_column_from_arrow with type " +
column.get_name());
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override {
+ return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
+ "read_column_from_arrow with type " +
column.get_name());
}
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_serde.h
b/be/src/vec/data_types/serde/data_type_serde.h
index a53c3dd5136..cc9baec4f1e 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -344,12 +344,12 @@ public:
// JSON serializer and deserializer
// Arrow serializer and deserializer
- virtual void write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
- arrow::ArrayBuilder* array_builder,
int64_t start,
- int64_t end, const cctz::time_zone&
ctz) const = 0;
- virtual void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz) const = 0;
+ virtual Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder,
int64_t start,
+ int64_t end, const cctz::time_zone&
ctz) const = 0;
+ virtual Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array,
+ int64_t start, int64_t end,
+ const cctz::time_zone& ctz) const =
0;
// ORC serializer
virtual Status write_column_to_orc(const std::string& timezone, const
IColumn& column,
@@ -405,13 +405,13 @@ inline static NullMap revert_null_map(const NullMap*
null_bytemap, size_t start,
return res;
}
-inline void checkArrowStatus(const arrow::Status& status, const std::string&
column,
- const std::string& format_name) {
+inline Status checkArrowStatus(const arrow::Status& status, const std::string&
column,
+ const std::string& format_name) {
if (!status.ok()) {
- throw Exception(
- Status::FatalError("arrow serde with arrow: {} with column :
{} with error msg: {}",
- format_name, column, status.ToString()));
+ return Status::FatalError("arrow serde with arrow: {} with column : {}
with error msg: {}",
+ format_name, column, status.ToString());
}
+ return Status::OK();
}
DataTypeSerDeSPtrs create_data_type_serdes(
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h
b/be/src/vec/data_types/serde/data_type_string_serde.h
index 4af97453e00..09d6762e43c 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.h
+++ b/be/src/vec/data_types/serde/data_type_string_serde.h
@@ -271,25 +271,41 @@ public:
assert_cast<ColumnType&>(column).insert_data(blob->getBlob(),
blob->getBlobLen());
}
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override {
+ template <typename BuilderType>
+ Status write_column_to_arrow_impl(const IColumn& column, const NullMap*
null_map,
+ BuilderType& builder, int64_t start,
int64_t end) const {
const auto& string_column = assert_cast<const ColumnType&>(column);
- auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
- checkArrowStatus(builder.AppendNull(), column.get_name(),
- array_builder->type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
column.get_name(),
+ builder.type()->name()));
continue;
}
auto string_ref = string_column.get_data_at(string_i);
- checkArrowStatus(
+ RETURN_IF_ERROR(checkArrowStatus(
builder.Append(string_ref.data, cast_set<int, size_t,
false>(string_ref.size)),
- column.get_name(), array_builder->type()->name());
+ column.get_name(), builder.type()->name()));
+ }
+ return Status::OK();
+ }
+
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override {
+ if (array_builder->type()->id() == arrow::Type::LARGE_STRING) {
+ auto& builder =
assert_cast<arrow::LargeStringBuilder&>(*array_builder);
+ return write_column_to_arrow_impl(column, null_map, builder,
start, end);
+ } else if (array_builder->type()->id() == arrow::Type::STRING) {
+ auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+ return write_column_to_arrow_impl(column, null_map, builder,
start, end);
+ } else {
+ return Status::InvalidArgument("Unsupported arrow type for string
column: {}",
+ array_builder->type()->name());
}
}
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override {
+
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override {
if (arrow_array->type_id() == arrow::Type::STRING ||
arrow_array->type_id() == arrow::Type::BINARY) {
const auto* concrete_array = dynamic_cast<const
arrow::BinaryArray*>(arrow_array);
@@ -318,7 +334,25 @@ public:
assert_cast<ColumnType&>(column).insert_default();
}
}
+ } else if (arrow_array->type_id() == arrow::Type::LARGE_STRING ||
+ arrow_array->type_id() == arrow::Type::LARGE_BINARY) {
+ const auto* concrete_array = dynamic_cast<const
arrow::LargeBinaryArray*>(arrow_array);
+ std::shared_ptr<arrow::Buffer> buffer =
concrete_array->value_data();
+
+ for (auto offset_i = start; offset_i < end; ++offset_i) {
+ if (!concrete_array->IsNull(offset_i)) {
+ const auto* raw_data = buffer->data() +
concrete_array->value_offset(offset_i);
+ assert_cast<ColumnType&>(column).insert_data(
+ (char*)raw_data,
concrete_array->value_length(offset_i));
+ } else {
+ assert_cast<ColumnType&>(column).insert_default();
+ }
+ }
+ } else {
+ return Status::InvalidArgument("Unsupported arrow type for string
column: {}",
+ arrow_array->type_id());
}
+ return Status::OK();
}
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp
b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
index dc314767a95..c86d0f9b259 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
@@ -24,6 +24,7 @@
#include "vec/columns/column_const.h"
#include "vec/columns/column_struct.h"
#include "vec/common/string_ref.h"
+#include "vec/data_types/serde/data_type_serde.h"
namespace doris {
@@ -321,36 +322,39 @@ void
DataTypeStructSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbV
column.deserialize_and_insert_from_arena(blob->getBlob());
}
-void DataTypeStructSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
- arrow::ArrayBuilder*
array_builder, int64_t start,
- int64_t end, const
cctz::time_zone& ctz) const {
+Status DataTypeStructSerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
+ arrow::ArrayBuilder*
array_builder, int64_t start,
+ int64_t end, const
cctz::time_zone& ctz) const {
auto& builder = assert_cast<arrow::StructBuilder&>(*array_builder);
const auto& struct_column = assert_cast<const ColumnStruct&>(column);
for (auto r = start; r < end; ++r) {
if (null_map != nullptr && (*null_map)[r]) {
- checkArrowStatus(builder.AppendNull(), struct_column.get_name(),
- builder.type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(),
struct_column.get_name(),
+ builder.type()->name()));
continue;
}
- checkArrowStatus(builder.Append(), struct_column.get_name(),
builder.type()->name());
+ RETURN_IF_ERROR(checkArrowStatus(builder.Append(),
struct_column.get_name(),
+ builder.type()->name()));
for (auto ei = 0; ei < struct_column.tuple_size(); ++ei) {
auto* elem_builder = builder.field_builder(ei);
-
elem_serdes_ptrs[ei]->write_column_to_arrow(struct_column.get_column(ei),
nullptr,
- elem_builder, r, r +
1, ctz);
+ RETURN_IF_ERROR(elem_serdes_ptrs[ei]->write_column_to_arrow(
+ struct_column.get_column(ei), nullptr, elem_builder, r, r
+ 1, ctz));
}
}
+ return Status::OK();
}
-void DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int64_t start, int64_t end,
- const cctz::time_zone& ctz)
const {
+Status DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int64_t start, int64_t end,
+ const cctz::time_zone& ctz)
const {
auto& struct_column = static_cast<ColumnStruct&>(column);
const auto* concrete_struct = dynamic_cast<const
arrow::StructArray*>(arrow_array);
DCHECK_EQ(struct_column.tuple_size(), concrete_struct->num_fields());
for (auto i = 0; i < struct_column.tuple_size(); ++i) {
- elem_serdes_ptrs[i]->read_column_from_arrow(
- struct_column.get_column(i), concrete_struct->field(i).get(),
start, end, ctz);
+ RETURN_IF_ERROR(elem_serdes_ptrs[i]->read_column_from_arrow(
+ struct_column.get_column(i), concrete_struct->field(i).get(),
start, end, ctz));
}
+ return Status::OK();
}
template <bool is_binary_format>
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h
b/be/src/vec/data_types/serde/data_type_struct_serde.h
index a1f4cdd3b57..cd24b02a3ed 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.h
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.h
@@ -147,11 +147,11 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
- void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
- arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
- const cctz::time_zone& ctz) const override;
- void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
- int64_t end, const cctz::time_zone& ctz) const
override;
+ Status write_column_to_arrow(const IColumn& column, const NullMap*
null_map,
+ arrow::ArrayBuilder* array_builder, int64_t
start, int64_t end,
+ const cctz::time_zone& ctz) const override;
+ Status read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int64_t start,
+ int64_t end, const cctz::time_zone& ctz)
const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int64_t row_idx, bool col_const,
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
index c6d9644e2dc..bb9e40b2e3e 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
@@ -24,6 +24,7 @@
#include "arrow/result.h"
#include "arrow_pip_input_stream.h"
#include "common/logging.h"
+#include "common/status.h"
#include "io/fs/stream_load_pipe.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
@@ -99,8 +100,8 @@ Status ArrowStreamReader::get_next_block(Block* block,
size_t* read_rows, bool*
try {
const vectorized::ColumnWithTypeAndName& column_with_name =
block->get_by_name(column_name);
- column_with_name.type->get_serde()->read_column_from_arrow(
- column_with_name.column->assume_mutable_ref(), column,
0, num_rows, _ctzz);
+
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
+ column_with_name.column->assume_mutable_ref(), column,
0, num_rows, _ctzz));
} catch (Exception& e) {
return Status::InternalError("Failed to convert from arrow to
block: {}", e.what());
}
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp
b/be/src/vec/utils/arrow_column_to_doris_column.cpp
index 99a9d70bc09..ed931af9477 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.cpp
+++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp
@@ -34,6 +34,7 @@
#include "arrow/array/array_binary.h"
#include "arrow/array/array_nested.h"
#include "arrow/type.h"
+#include "common/status.h"
#include "util/binary_cast.hpp"
#include "util/timezone_utils.h"
#include "vec/columns/column.h"
@@ -99,9 +100,9 @@ Status arrow_column_to_doris_column(const arrow::Array*
arrow_column, size_t arr
Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t
arrow_batch_cur_idx,
ColumnPtr& doris_column, const
DataTypePtr& type,
size_t num_elements, const
cctz::time_zone& ctz) {
-
type->get_serde()->read_column_from_arrow(doris_column->assume_mutable_ref(),
arrow_column,
- arrow_batch_cur_idx,
- arrow_batch_cur_idx +
num_elements, ctz);
+ RETURN_IF_ERROR(type->get_serde()->read_column_from_arrow(
+ doris_column->assume_mutable_ref(), arrow_column,
arrow_batch_cur_idx,
+ arrow_batch_cur_idx + num_elements, ctz));
return Status::OK();
}
diff --git a/be/test/vec/data_types/common_data_type_serder_test.h
b/be/test/vec/data_types/common_data_type_serder_test.h
index ef8d07323df..70b3f976e8b 100644
--- a/be/test/vec/data_types/common_data_type_serder_test.h
+++ b/be/test/vec/data_types/common_data_type_serder_test.h
@@ -392,8 +392,7 @@ public:
auto rows = record_batch->num_rows();
for (size_t i = 0; i < record_batch->num_columns(); ++i) {
auto array = record_batch->column(i);
- std::cout << "arrow record_batch pos: " << i << ", array: " <<
array->ToString()
- << std::endl;
+ std::cout << "arrow record_batch pos: " << i << std::endl;
auto& column_with_type_and_name = new_block->get_by_position(i);
std::cout << "now we are testing column: "
<< column_with_type_and_name.column->get_name()
@@ -403,15 +402,17 @@ public:
column_with_type_and_name.type, rows, "UTC");
// do check data
std::cout << "arrow_column_to_doris_column done, column data: "
- << column_with_type_and_name.to_string(0)
+ << column_with_type_and_name.to_string(0).substr(0, 256)
<< ", column size: " <<
column_with_type_and_name.column->size() << std::endl;
EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" <<
ret.to_string();
}
std::cout << "arrow deserialize block structure: " <<
new_block->dump_structure()
<< std::endl;
std::cout << "arrow deserialize block data: "
- << new_block->dump_data(
- 0, std::min(static_cast<size_t>(rows),
static_cast<size_t>(5)))
+ << new_block
+ ->dump_data(
+ 0, std::min(static_cast<size_t>(rows),
static_cast<size_t>(5)))
+ .substr(0, 256)
<< std::endl;
}
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
index adc3841b98b..9f30a69eb3b 100644
--- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -474,4 +474,24 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
}
+TEST(DataTypeSerDeArrowTest, BigStringSerDeTest) {
+ std::string col_name = "big_string";
+ auto block = std::make_shared<Block>();
+ auto strcol = vectorized::ColumnString::create();
+ // 2G, if > 4G report string column length is too large:
total_length=4402341462
+ for (int i = 0; i < 20; ++i) {
+ std::string is(107374182, '0'); // 100M
+ strcol->insert_data(is.c_str(), is.size());
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
+ vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(),
data_type, col_name);
+ block->insert(type_and_name);
+
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]