This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f999b823fc [feature](array) support array for apache arrow convertor 
(#17682)
f999b823fc is described below

commit f999b823fce0e92f80e45c88bb871907b2371612
Author: Kang <[email protected]>
AuthorDate: Tue Mar 14 17:53:16 2023 +0800

    [feature](array) support array for apache arrow convertor (#17682)
    
    * support array type for arrow
    
    * fix builder.Append() for each array row
    
    * fix array child column append start offset
---
 be/src/util/arrow/block_convertor.cpp | 126 ++++++++++++++++++++++++++--------
 be/src/util/arrow/row_batch.cpp       |  28 ++++++++
 2 files changed, 126 insertions(+), 28 deletions(-)

diff --git a/be/src/util/arrow/block_convertor.cpp 
b/be/src/util/arrow/block_convertor.cpp
index d4c4f2f6ba..3fbfcbd0ed 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -18,6 +18,7 @@
 #include "util/arrow/block_convertor.h"
 
 #include <arrow/array.h>
+#include <arrow/array/builder_nested.h>
 #include <arrow/array/builder_primitive.h>
 #include <arrow/buffer.h>
 #include <arrow/builder.h>
@@ -42,6 +43,11 @@
 #include "runtime/large_int_value.h"
 #include "util/arrow/utils.h"
 #include "util/types.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_map.h"
 
 namespace doris {
 
@@ -81,10 +87,11 @@ public:
 
     // process string-transformable field
     arrow::Status Visit(const arrow::StringType& type) override {
-        arrow::StringBuilder builder(_pool);
-        size_t num_rows = _block.rows();
+        auto& builder = assert_cast<arrow::StringBuilder&>(*_cur_builder);
+        size_t start = _cur_start;
+        size_t num_rows = _cur_rows;
         ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = 0; i < num_rows; ++i) {
+        for (size_t i = start; i < start + num_rows; ++i) {
             bool is_null = _cur_col->is_null_at(i);
             if (is_null) {
                 ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -151,20 +158,21 @@ public:
             }
             }
         }
-        return builder.Finish(&_arrays[_cur_field_idx]);
+        return arrow::Status::OK();
     }
 
     // process doris Decimal
     arrow::Status Visit(const arrow::Decimal128Type& type) override {
-        size_t num_rows = _block.rows();
+        auto& builder = assert_cast<arrow::Decimal128Builder&>(*_cur_builder);
+        size_t start = _cur_start;
+        size_t num_rows = _cur_rows;
         if (auto* decimalv2_column = vectorized::check_and_get_column<
                     vectorized::ColumnDecimal<vectorized::Decimal128>>(
                     *vectorized::remove_nullable(_cur_col))) {
             std::shared_ptr<arrow::DataType> s_decimal_ptr =
                     std::make_shared<arrow::Decimal128Type>(27, 9);
-            arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
             ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = 0; i < num_rows; ++i) {
+            for (size_t i = start; i < start + num_rows; ++i) {
                 bool is_null = _cur_col->is_null_at(i);
                 if (is_null) {
                     ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -177,15 +185,14 @@ public:
                 arrow::Decimal128 value(high, low);
                 ARROW_RETURN_NOT_OK(builder.Append(value));
             }
-            return builder.Finish(&_arrays[_cur_field_idx]);
+            return arrow::Status::OK();
         } else if (auto* decimal128_column = vectorized::check_and_get_column<
                            vectorized::ColumnDecimal<vectorized::Decimal128I>>(
                            *vectorized::remove_nullable(_cur_col))) {
             std::shared_ptr<arrow::DataType> s_decimal_ptr =
                     std::make_shared<arrow::Decimal128Type>(38, 
decimal128_column->get_scale());
-            arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
             ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = 0; i < num_rows; ++i) {
+            for (size_t i = start; i < start + num_rows; ++i) {
                 bool is_null = _cur_col->is_null_at(i);
                 if (is_null) {
                     ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -198,15 +205,14 @@ public:
                 arrow::Decimal128 value(high, low);
                 ARROW_RETURN_NOT_OK(builder.Append(value));
             }
-            return builder.Finish(&_arrays[_cur_field_idx]);
+            return arrow::Status::OK();
         } else if (auto* decimal32_column = vectorized::check_and_get_column<
                            vectorized::ColumnDecimal<vectorized::Decimal32>>(
                            *vectorized::remove_nullable(_cur_col))) {
             std::shared_ptr<arrow::DataType> s_decimal_ptr =
                     std::make_shared<arrow::Decimal128Type>(8, 
decimal32_column->get_scale());
-            arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
             ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = 0; i < num_rows; ++i) {
+            for (size_t i = start; i < start + num_rows; ++i) {
                 bool is_null = _cur_col->is_null_at(i);
                 if (is_null) {
                     ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -218,15 +224,14 @@ public:
                 arrow::Decimal128 value(high, *p_value > 0 ? *p_value : 
-*p_value);
                 ARROW_RETURN_NOT_OK(builder.Append(value));
             }
-            return builder.Finish(&_arrays[_cur_field_idx]);
+            return arrow::Status::OK();
         } else if (auto* decimal64_column = vectorized::check_and_get_column<
                            vectorized::ColumnDecimal<vectorized::Decimal64>>(
                            *vectorized::remove_nullable(_cur_col))) {
             std::shared_ptr<arrow::DataType> s_decimal_ptr =
                     std::make_shared<arrow::Decimal128Type>(18, 
decimal64_column->get_scale());
-            arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
             ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-            for (size_t i = 0; i < num_rows; ++i) {
+            for (size_t i = start; i < start + num_rows; ++i) {
                 bool is_null = _cur_col->is_null_at(i);
                 if (is_null) {
                     ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -238,17 +243,18 @@ public:
                 arrow::Decimal128 value(high, *p_value > 0 ? *p_value : 
-*p_value);
                 ARROW_RETURN_NOT_OK(builder.Append(value));
             }
-            return builder.Finish(&_arrays[_cur_field_idx]);
+            return arrow::Status::OK();
         } else {
             return arrow::Status::TypeError("Unsupported column:" + 
_cur_col->get_name());
         }
     }
     // process boolean
     arrow::Status Visit(const arrow::BooleanType& type) override {
-        arrow::BooleanBuilder builder(_pool);
-        size_t num_rows = _block.rows();
+        auto& builder = assert_cast<arrow::BooleanBuilder&>(*_cur_builder);
+        size_t start = _cur_start;
+        size_t num_rows = _cur_rows;
         ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = 0; i < num_rows; ++i) {
+        for (size_t i = start; i < start + num_rows; ++i) {
             bool is_null = _cur_col->is_null_at(i);
             if (is_null) {
                 ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -257,7 +263,56 @@ public:
             const auto& data_ref = _cur_col->get_data_at(i);
             ARROW_RETURN_NOT_OK(builder.Append(*(const bool*)data_ref.data));
         }
-        return builder.Finish(&_arrays[_cur_field_idx]);
+        return arrow::Status::OK();
+    }
+
+    // process array type
+    arrow::Status Visit(const arrow::ListType& type) override {
+        auto& builder = assert_cast<arrow::ListBuilder&>(*_cur_builder);
+        auto orignal_col = _cur_col;
+        size_t start = _cur_start;
+        size_t num_rows = _cur_rows;
+
+        const vectorized::ColumnArray* array_column = nullptr;
+        if (orignal_col->is_nullable()) {
+            auto nullable_column =
+                    assert_cast<const 
vectorized::ColumnNullable*>(orignal_col.get());
+            array_column = assert_cast<const vectorized::ColumnArray*>(
+                    &nullable_column->get_nested_column());
+        } else {
+            array_column = assert_cast<const 
vectorized::ColumnArray*>(orignal_col.get());
+        }
+        const auto& offsets = array_column->get_offsets();
+        vectorized::ColumnPtr nested_column = array_column->get_data_ptr();
+
+        // set current col/type/builder to nested
+        _cur_col = nested_column;
+        if (_cur_type->is_nullable()) {
+            auto nullable_type = assert_cast<const 
vectorized::DataTypeNullable*>(_cur_type.get());
+            _cur_type = assert_cast<const vectorized::DataTypeArray*>(
+                                nullable_type->get_nested_type().get())
+                                ->get_nested_type();
+        } else {
+            _cur_type = assert_cast<const 
vectorized::DataTypeArray*>(_cur_type.get())
+                                ->get_nested_type();
+        }
+        _cur_builder = builder.value_builder();
+
+        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
+        for (size_t i = start; i < start + num_rows; ++i) {
+            bool is_null = orignal_col->is_null_at(i);
+            if (is_null) {
+                ARROW_RETURN_NOT_OK(builder.AppendNull());
+                continue;
+            }
+            // append array elements in row i
+            ARROW_RETURN_NOT_OK(builder.Append());
+            _cur_start = offsets[i - 1];
+            _cur_rows = offsets[i] - offsets[i - 1];
+            ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*type.value_type(), 
this));
+        }
+
+        return arrow::Status::OK();
     }
 
     Status convert(std::shared_ptr<arrow::RecordBatch>* out);
@@ -265,12 +320,12 @@ public:
 private:
     template <typename T>
     arrow::Status _visit(const T& type) {
-        arrow::NumericBuilder<T> builder(_pool);
-
-        size_t num_rows = _block.rows();
+        auto& builder = assert_cast<arrow::NumericBuilder<T>&>(*_cur_builder);
+        size_t start = _cur_start;
+        size_t num_rows = _cur_rows;
         ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
         if (_cur_col->is_nullable()) {
-            for (size_t i = 0; i < num_rows; ++i) {
+            for (size_t i = start; i < start + num_rows; ++i) {
                 bool is_null = _cur_col->is_null_at(i);
                 if (is_null) {
                     ARROW_RETURN_NOT_OK(builder.AppendNull());
@@ -281,9 +336,9 @@ private:
             }
         } else {
             ARROW_RETURN_NOT_OK(builder.AppendValues(
-                    (const typename T::c_type*)_cur_col->get_data_at(0).data, 
num_rows));
+                    (const typename 
T::c_type*)_cur_col->get_data_at(start).data, num_rows));
         }
-        return builder.Finish(&_arrays[_cur_field_idx]);
+        return arrow::Status::OK();
     }
 
     const vectorized::Block& _block;
@@ -291,8 +346,11 @@ private:
     arrow::MemoryPool* _pool;
 
     size_t _cur_field_idx;
+    size_t _cur_start;
+    size_t _cur_rows;
     vectorized::ColumnPtr _cur_col;
     vectorized::DataTypePtr _cur_type;
+    arrow::ArrayBuilder* _cur_builder = nullptr;
 
     std::string _time_zone;
 
@@ -309,9 +367,21 @@ Status 
FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
 
     for (size_t idx = 0; idx < num_fields; ++idx) {
         _cur_field_idx = idx;
+        _cur_start = 0;
+        _cur_rows = _block.rows();
         _cur_col = _block.get_by_position(idx).column;
         _cur_type = _block.get_by_position(idx).type;
-        auto arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), 
this);
+        std::unique_ptr<arrow::ArrayBuilder> builder;
+        auto arrow_st = arrow::MakeBuilder(_pool, _schema->field(idx)->type(), 
&builder);
+        if (!arrow_st.ok()) {
+            return to_status(arrow_st);
+        }
+        _cur_builder = builder.get();
+        arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this);
+        if (!arrow_st.ok()) {
+            return to_status(arrow_st);
+        }
+        arrow_st = _cur_builder->Finish(&_arrays[_cur_field_idx]);
         if (!arrow_st.ok()) {
             return to_status(arrow_st);
         }
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index 508a495c69..51c490c1ae 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -92,6 +92,34 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
     case TYPE_BOOLEAN:
         *result = arrow::boolean();
         break;
+    case TYPE_ARRAY: {
+        DCHECK_EQ(type.children.size(), 1);
+        std::shared_ptr<arrow::DataType> item_type;
+        convert_to_arrow_type(type.children[0], &item_type);
+        *result = std::make_shared<arrow::ListType>(item_type);
+        break;
+    }
+    case TYPE_MAP: {
+        DCHECK_EQ(type.children.size(), 2);
+        std::shared_ptr<arrow::DataType> key_type;
+        std::shared_ptr<arrow::DataType> val_type;
+        convert_to_arrow_type(type.children[0], &key_type);
+        convert_to_arrow_type(type.children[1], &val_type);
+        *result = std::make_shared<arrow::MapType>(key_type, val_type);
+        break;
+    }
+    case TYPE_STRUCT: {
+        DCHECK_GT(type.children.size(), 0);
+        std::vector<std::shared_ptr<arrow::Field>> fields;
+        for (size_t i = 0; i < type.children.size(); i++) {
+            std::shared_ptr<arrow::DataType> field_type;
+            convert_to_arrow_type(type.children[i], &field_type);
+            
fields.push_back(std::make_shared<arrow::Field>(type.field_names[i], field_type,
+                                                            
type.contains_nulls[i]));
+        }
+        *result = std::make_shared<arrow::StructType>(fields);
+        break;
+    }
     default:
         return Status::InvalidArgument("Unknown primitive type({})", 
type.type);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to