This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f999b823fc [feature](array) support array for apache arrow convertor
(#17682)
f999b823fc is described below
commit f999b823fce0e92f80e45c88bb871907b2371612
Author: Kang <[email protected]>
AuthorDate: Tue Mar 14 17:53:16 2023 +0800
[feature](array) support array for apache arrow convertor (#17682)
* support array type for arrow
* fix builder.Append() for each array row
* fix array child column append start offset
---
be/src/util/arrow/block_convertor.cpp | 126 ++++++++++++++++++++++++++--------
be/src/util/arrow/row_batch.cpp | 28 ++++++++
2 files changed, 126 insertions(+), 28 deletions(-)
diff --git a/be/src/util/arrow/block_convertor.cpp
b/be/src/util/arrow/block_convertor.cpp
index d4c4f2f6ba..3fbfcbd0ed 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -18,6 +18,7 @@
#include "util/arrow/block_convertor.h"
#include <arrow/array.h>
+#include <arrow/array/builder_nested.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/buffer.h>
#include <arrow/builder.h>
@@ -42,6 +43,11 @@
#include "runtime/large_int_value.h"
#include "util/arrow/utils.h"
#include "util/types.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_map.h"
namespace doris {
@@ -81,10 +87,11 @@ public:
// process string-transformable field
arrow::Status Visit(const arrow::StringType& type) override {
- arrow::StringBuilder builder(_pool);
- size_t num_rows = _block.rows();
+ auto& builder = assert_cast<arrow::StringBuilder&>(*_cur_builder);
+ size_t start = _cur_start;
+ size_t num_rows = _cur_rows;
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
+ for (size_t i = start; i < start + num_rows; ++i) {
bool is_null = _cur_col->is_null_at(i);
if (is_null) {
ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -151,20 +158,21 @@ public:
}
}
}
- return builder.Finish(&_arrays[_cur_field_idx]);
+ return arrow::Status::OK();
}
// process doris Decimal
arrow::Status Visit(const arrow::Decimal128Type& type) override {
- size_t num_rows = _block.rows();
+ auto& builder = assert_cast<arrow::Decimal128Builder&>(*_cur_builder);
+ size_t start = _cur_start;
+ size_t num_rows = _cur_rows;
if (auto* decimalv2_column = vectorized::check_and_get_column<
vectorized::ColumnDecimal<vectorized::Decimal128>>(
*vectorized::remove_nullable(_cur_col))) {
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(27, 9);
- arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
+ for (size_t i = start; i < start + num_rows; ++i) {
bool is_null = _cur_col->is_null_at(i);
if (is_null) {
ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -177,15 +185,14 @@ public:
arrow::Decimal128 value(high, low);
ARROW_RETURN_NOT_OK(builder.Append(value));
}
- return builder.Finish(&_arrays[_cur_field_idx]);
+ return arrow::Status::OK();
} else if (auto* decimal128_column = vectorized::check_and_get_column<
vectorized::ColumnDecimal<vectorized::Decimal128I>>(
*vectorized::remove_nullable(_cur_col))) {
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(38,
decimal128_column->get_scale());
- arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
+ for (size_t i = start; i < start + num_rows; ++i) {
bool is_null = _cur_col->is_null_at(i);
if (is_null) {
ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -198,15 +205,14 @@ public:
arrow::Decimal128 value(high, low);
ARROW_RETURN_NOT_OK(builder.Append(value));
}
- return builder.Finish(&_arrays[_cur_field_idx]);
+ return arrow::Status::OK();
} else if (auto* decimal32_column = vectorized::check_and_get_column<
vectorized::ColumnDecimal<vectorized::Decimal32>>(
*vectorized::remove_nullable(_cur_col))) {
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(8,
decimal32_column->get_scale());
- arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
+ for (size_t i = start; i < start + num_rows; ++i) {
bool is_null = _cur_col->is_null_at(i);
if (is_null) {
ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -218,15 +224,14 @@ public:
arrow::Decimal128 value(high, *p_value > 0 ? *p_value :
-*p_value);
ARROW_RETURN_NOT_OK(builder.Append(value));
}
- return builder.Finish(&_arrays[_cur_field_idx]);
+ return arrow::Status::OK();
} else if (auto* decimal64_column = vectorized::check_and_get_column<
vectorized::ColumnDecimal<vectorized::Decimal64>>(
*vectorized::remove_nullable(_cur_col))) {
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(18,
decimal64_column->get_scale());
- arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
+ for (size_t i = start; i < start + num_rows; ++i) {
bool is_null = _cur_col->is_null_at(i);
if (is_null) {
ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -238,17 +243,18 @@ public:
arrow::Decimal128 value(high, *p_value > 0 ? *p_value :
-*p_value);
ARROW_RETURN_NOT_OK(builder.Append(value));
}
- return builder.Finish(&_arrays[_cur_field_idx]);
+ return arrow::Status::OK();
} else {
return arrow::Status::TypeError("Unsupported column:" +
_cur_col->get_name());
}
}
// process boolean
arrow::Status Visit(const arrow::BooleanType& type) override {
- arrow::BooleanBuilder builder(_pool);
- size_t num_rows = _block.rows();
+ auto& builder = assert_cast<arrow::BooleanBuilder&>(*_cur_builder);
+ size_t start = _cur_start;
+ size_t num_rows = _cur_rows;
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
+ for (size_t i = start; i < start + num_rows; ++i) {
bool is_null = _cur_col->is_null_at(i);
if (is_null) {
ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -257,7 +263,56 @@ public:
const auto& data_ref = _cur_col->get_data_at(i);
ARROW_RETURN_NOT_OK(builder.Append(*(const bool*)data_ref.data));
}
- return builder.Finish(&_arrays[_cur_field_idx]);
+ return arrow::Status::OK();
+ }
+
+ // process array type
+ arrow::Status Visit(const arrow::ListType& type) override {
+ auto& builder = assert_cast<arrow::ListBuilder&>(*_cur_builder);
+ auto orignal_col = _cur_col;
+ size_t start = _cur_start;
+ size_t num_rows = _cur_rows;
+
+ const vectorized::ColumnArray* array_column = nullptr;
+ if (orignal_col->is_nullable()) {
+ auto nullable_column =
+ assert_cast<const
vectorized::ColumnNullable*>(orignal_col.get());
+ array_column = assert_cast<const vectorized::ColumnArray*>(
+ &nullable_column->get_nested_column());
+ } else {
+ array_column = assert_cast<const
vectorized::ColumnArray*>(orignal_col.get());
+ }
+ const auto& offsets = array_column->get_offsets();
+ vectorized::ColumnPtr nested_column = array_column->get_data_ptr();
+
+ // set current col/type/builder to nested
+ _cur_col = nested_column;
+ if (_cur_type->is_nullable()) {
+ auto nullable_type = assert_cast<const
vectorized::DataTypeNullable*>(_cur_type.get());
+ _cur_type = assert_cast<const vectorized::DataTypeArray*>(
+ nullable_type->get_nested_type().get())
+ ->get_nested_type();
+ } else {
+ _cur_type = assert_cast<const
vectorized::DataTypeArray*>(_cur_type.get())
+ ->get_nested_type();
+ }
+ _cur_builder = builder.value_builder();
+
+ ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
+ for (size_t i = start; i < start + num_rows; ++i) {
+ bool is_null = orignal_col->is_null_at(i);
+ if (is_null) {
+ ARROW_RETURN_NOT_OK(builder.AppendNull());
+ continue;
+ }
+ // append array elements in row i
+ ARROW_RETURN_NOT_OK(builder.Append());
+ _cur_start = offsets[i - 1];
+ _cur_rows = offsets[i] - offsets[i - 1];
+ ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*type.value_type(),
this));
+ }
+
+ return arrow::Status::OK();
}
Status convert(std::shared_ptr<arrow::RecordBatch>* out);
@@ -265,12 +320,12 @@ public:
private:
template <typename T>
arrow::Status _visit(const T& type) {
- arrow::NumericBuilder<T> builder(_pool);
-
- size_t num_rows = _block.rows();
+ auto& builder = assert_cast<arrow::NumericBuilder<T>&>(*_cur_builder);
+ size_t start = _cur_start;
+ size_t num_rows = _cur_rows;
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
if (_cur_col->is_nullable()) {
- for (size_t i = 0; i < num_rows; ++i) {
+ for (size_t i = start; i < start + num_rows; ++i) {
bool is_null = _cur_col->is_null_at(i);
if (is_null) {
ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -281,9 +336,9 @@ private:
}
} else {
ARROW_RETURN_NOT_OK(builder.AppendValues(
- (const typename T::c_type*)_cur_col->get_data_at(0).data,
num_rows));
+ (const typename
T::c_type*)_cur_col->get_data_at(start).data, num_rows));
}
- return builder.Finish(&_arrays[_cur_field_idx]);
+ return arrow::Status::OK();
}
const vectorized::Block& _block;
@@ -291,8 +346,11 @@ private:
arrow::MemoryPool* _pool;
size_t _cur_field_idx;
+ size_t _cur_start;
+ size_t _cur_rows;
vectorized::ColumnPtr _cur_col;
vectorized::DataTypePtr _cur_type;
+ arrow::ArrayBuilder* _cur_builder = nullptr;
std::string _time_zone;
@@ -309,9 +367,21 @@ Status
FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
for (size_t idx = 0; idx < num_fields; ++idx) {
_cur_field_idx = idx;
+ _cur_start = 0;
+ _cur_rows = _block.rows();
_cur_col = _block.get_by_position(idx).column;
_cur_type = _block.get_by_position(idx).type;
- auto arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(),
this);
+ std::unique_ptr<arrow::ArrayBuilder> builder;
+ auto arrow_st = arrow::MakeBuilder(_pool, _schema->field(idx)->type(),
&builder);
+ if (!arrow_st.ok()) {
+ return to_status(arrow_st);
+ }
+ _cur_builder = builder.get();
+ arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this);
+ if (!arrow_st.ok()) {
+ return to_status(arrow_st);
+ }
+ arrow_st = _cur_builder->Finish(&_arrays[_cur_field_idx]);
if (!arrow_st.ok()) {
return to_status(arrow_st);
}
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index 508a495c69..51c490c1ae 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -92,6 +92,34 @@ Status convert_to_arrow_type(const TypeDescriptor& type,
std::shared_ptr<arrow::
case TYPE_BOOLEAN:
*result = arrow::boolean();
break;
+ case TYPE_ARRAY: {
+ DCHECK_EQ(type.children.size(), 1);
+ std::shared_ptr<arrow::DataType> item_type;
+ convert_to_arrow_type(type.children[0], &item_type);
+ *result = std::make_shared<arrow::ListType>(item_type);
+ break;
+ }
+ case TYPE_MAP: {
+ DCHECK_EQ(type.children.size(), 2);
+ std::shared_ptr<arrow::DataType> key_type;
+ std::shared_ptr<arrow::DataType> val_type;
+ convert_to_arrow_type(type.children[0], &key_type);
+ convert_to_arrow_type(type.children[1], &val_type);
+ *result = std::make_shared<arrow::MapType>(key_type, val_type);
+ break;
+ }
+ case TYPE_STRUCT: {
+ DCHECK_GT(type.children.size(), 0);
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ for (size_t i = 0; i < type.children.size(); i++) {
+ std::shared_ptr<arrow::DataType> field_type;
+ convert_to_arrow_type(type.children[i], &field_type);
+
fields.push_back(std::make_shared<arrow::Field>(type.field_names[i], field_type,
+
type.contains_nulls[i]));
+ }
+ *result = std::make_shared<arrow::StructType>(fields);
+ break;
+ }
default:
return Status::InvalidArgument("Unknown primitive type({})",
type.type);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]