This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e9a4cbcdf9 [Refact](type system) refact column with arrow serde
(#19091)
e9a4cbcdf9 is described below
commit e9a4cbcdf9442e3577e54e5742d9c26335453551
Author: amory <[email protected]>
AuthorDate: Thu May 4 15:28:46 2023 +0800
[Refact](type system) refact column with arrow serde (#19091)
* refact arrow serde
* add date serde
* update arrow and fix nullable and date type
---
be/src/util/arrow/block_convertor.cpp | 6 +-
be/src/vec/CMakeLists.txt | 3 +
be/src/vec/columns/column.h | 1 +
be/src/vec/data_types/data_type_date.h | 3 +
be/src/vec/data_types/data_type_date_time.h | 3 +
be/src/vec/data_types/data_type_time_v2.h | 6 +-
.../vec/data_types/serde/data_type_array_serde.cpp | 40 +++
.../vec/data_types/serde/data_type_array_serde.h | 6 +
.../vec/data_types/serde/data_type_bitmap_serde.h | 9 +
.../data_types/serde/data_type_date64_serde.cpp | 111 +++++++
...pe_array_serde.cpp => data_type_date64_serde.h} | 41 +--
.../serde/data_type_datetimev2_serde.cpp | 49 +++
..._array_serde.h => data_type_datetimev2_serde.h} | 42 ++-
.../data_types/serde/data_type_datev2_serde.cpp | 68 +++++
...pe_array_serde.cpp => data_type_datev2_serde.h} | 41 +--
.../data_types/serde/data_type_decimal_serde.cpp | 133 ++++++++-
.../vec/data_types/serde/data_type_decimal_serde.h | 6 +
.../serde/data_type_fixedlengthobject_serde.h | 9 +
.../vec/data_types/serde/data_type_hll_serde.cpp | 21 ++
be/src/vec/data_types/serde/data_type_hll_serde.h | 7 +
.../vec/data_types/serde/data_type_map_serde.cpp | 12 +
be/src/vec/data_types/serde/data_type_map_serde.h | 5 +
.../data_types/serde/data_type_nullable_serde.cpp | 37 +++
.../data_types/serde/data_type_nullable_serde.h | 5 +
.../data_types/serde/data_type_number_serde.cpp | 110 ++++++-
.../vec/data_types/serde/data_type_number_serde.h | 12 +
.../vec/data_types/serde/data_type_object_serde.h | 10 +
.../serde/data_type_quantilestate_serde.h | 9 +
be/src/vec/data_types/serde/data_type_serde.h | 24 ++
.../data_types/serde/data_type_string_serde.cpp | 60 ++++
.../vec/data_types/serde/data_type_string_serde.h | 6 +
.../data_types/serde/data_type_struct_serde.cpp | 12 +
.../vec/data_types/serde/data_type_struct_serde.h | 6 +
be/src/vec/utils/arrow_column_to_doris_column.cpp | 329 +--------------------
be/test/CMakeLists.txt | 3 +-
.../serde/data_type_serde_arrow_test.cpp | 321 ++++++++++++++++++++
.../data_types/serde/data_type_serde_pb_test.cpp | 200 +++++++++++++
37 files changed, 1375 insertions(+), 391 deletions(-)
diff --git a/be/src/util/arrow/block_convertor.cpp
b/be/src/util/arrow/block_convertor.cpp
index 2b426ca2f2..89a1e09a73 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -389,10 +389,8 @@ Status
FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
return to_status(arrow_st);
}
_cur_builder = builder.get();
- arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this);
- if (!arrow_st.ok()) {
- return to_status(arrow_st);
- }
+ _cur_type->get_serde()->write_column_to_arrow(*_cur_col, nullptr,
_cur_builder, _cur_start,
+ _cur_start + _cur_rows);
arrow_st = _cur_builder->Finish(&_arrays[_cur_field_idx]);
if (!arrow_st.ok()) {
return to_status(arrow_st);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 6c82623816..086a301456 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -87,6 +87,9 @@ set(VEC_FILES
data_types/serde/data_type_array_serde.cpp
data_types/serde/data_type_struct_serde.cpp
data_types/serde/data_type_number_serde.cpp
+ data_types/serde/data_type_datev2_serde.cpp
+ data_types/serde/data_type_datetimev2_serde.cpp
+ data_types/serde/data_type_date64_serde.cpp
data_types/serde/data_type_string_serde.cpp
data_types/serde/data_type_decimal_serde.cpp
data_types/serde/data_type_object_serde.cpp
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index e30c35ba31..8bf83b533c 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -148,6 +148,7 @@ public:
virtual void set_rowset_segment_id(std::pair<RowsetId, uint32_t>
rowset_segment_id) {}
virtual std::pair<RowsetId, uint32_t> get_rowset_segment_id() const {
return {}; }
+ // todo(Amory) from column to get data type is not correct ,column is
memory data,can not to assume memory data belong to which data type
virtual TypeIndex get_data_type() const {
LOG(FATAL) << "Cannot get_data_type() column " << get_name();
__builtin_unreachable();
diff --git a/be/src/vec/data_types/data_type_date.h
b/be/src/vec/data_types/data_type_date.h
index 6648932d5f..54e45e63d3 100644
--- a/be/src/vec/data_types/data_type_date.h
+++ b/be/src/vec/data_types/data_type_date.h
@@ -32,6 +32,7 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_number_base.h"
+#include "vec/data_types/serde/data_type_date64_serde.h"
namespace doris {
namespace vectorized {
@@ -64,6 +65,8 @@ public:
static void cast_to_date(Int64& x);
MutableColumnPtr create_column() const override;
+
+ DataTypeSerDeSPtr get_serde() const override { return
std::make_shared<DataTypeDate64SerDe>(); }
};
} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_date_time.h
b/be/src/vec/data_types/data_type_date_time.h
index c4056f3612..98aa4b26be 100644
--- a/be/src/vec/data_types/data_type_date_time.h
+++ b/be/src/vec/data_types/data_type_date_time.h
@@ -32,6 +32,7 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_number_base.h"
+#include "vec/data_types/serde/data_type_date64_serde.h"
namespace doris {
namespace vectorized {
@@ -84,6 +85,8 @@ public:
std::string to_string(const IColumn& column, size_t row_num) const
override;
+ DataTypeSerDeSPtr get_serde() const override { return
std::make_shared<DataTypeDate64SerDe>(); }
+
void to_string(const IColumn& column, size_t row_num, BufferWritable&
ostr) const override;
Status from_string(ReadBuffer& rb, IColumn* column) const override;
diff --git a/be/src/vec/data_types/data_type_time_v2.h
b/be/src/vec/data_types/data_type_time_v2.h
index 8872eeafc0..336048b39a 100644
--- a/be/src/vec/data_types/data_type_time_v2.h
+++ b/be/src/vec/data_types/data_type_time_v2.h
@@ -34,6 +34,8 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_number_base.h"
+#include "vec/data_types/serde/data_type_datetimev2_serde.h"
+#include "vec/data_types/serde/data_type_datev2_serde.h"
#include "vec/data_types/serde/data_type_number_serde.h"
#include "vec/data_types/serde/data_type_serde.h"
@@ -66,6 +68,8 @@ public:
bool can_be_used_as_version() const override { return true; }
bool can_be_inside_nullable() const override { return true; }
+ DataTypeSerDeSPtr get_serde() const override { return
std::make_shared<DataTypeDateV2SerDe>(); }
+
bool equals(const IDataType& rhs) const override;
std::string to_string(const IColumn& column, size_t row_num) const
override;
void to_string(const IColumn& column, size_t row_num, BufferWritable&
ostr) const override;
@@ -113,7 +117,7 @@ public:
void to_string(const IColumn& column, size_t row_num, BufferWritable&
ostr) const override;
Status from_string(ReadBuffer& rb, IColumn* column) const override;
DataTypeSerDeSPtr get_serde() const override {
- return std::make_shared<DataTypeNumberSerDe<UInt64>>();
+ return std::make_shared<DataTypeDateTimeV2SerDe>();
};
MutableColumnPtr create_column() const override;
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp
b/be/src/vec/data_types/serde/data_type_array_serde.cpp
index 8ae9c61858..15c1a4f683 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp
@@ -17,8 +17,13 @@
#include "data_type_array_serde.h"
+#include <arrow/array/builder_nested.h>
+
+#include "gutil/casts.h"
#include "util/jsonb_document.h"
#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
namespace doris {
@@ -43,5 +48,40 @@ void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn&
column, const JsonbVa
column.deserialize_and_insert_from_arena(blob->getBlob());
}
+void DataTypeArraySerDe::write_column_to_arrow(const IColumn& column, const
UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ auto& array_column = static_cast<const ColumnArray&>(column);
+ auto& offsets = array_column.get_offsets();
+ auto& nested_data = array_column.get_data();
+ auto& builder = assert_cast<arrow::ListBuilder&>(*array_builder);
+ auto nested_builder = builder.value_builder();
+ for (size_t array_idx = start; array_idx < end; ++array_idx) {
+ checkArrowStatus(builder.Append(), column.get_name(),
array_builder->type()->name());
+ nested_serde->write_column_to_arrow(nested_data, null_map,
nested_builder,
+ offsets[array_idx - 1],
offsets[array_idx]);
+ }
+}
+
+void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ auto& column_array = static_cast<ColumnArray&>(column);
+ auto& offsets_data = column_array.get_offsets();
+ auto concrete_array = down_cast<const arrow::ListArray*>(arrow_array);
+ auto arrow_offsets_array = concrete_array->offsets();
+ auto arrow_offsets =
down_cast<arrow::Int32Array*>(arrow_offsets_array.get());
+ auto prev_size = offsets_data.back();
+ auto arrow_nested_start_offset = arrow_offsets->Value(start);
+ auto arrow_nested_end_offset = arrow_offsets->Value(end);
+ for (int64_t i = start + 1; i < end + 1; ++i) {
+ // convert to doris offset, start from offsets.back()
+ offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) -
arrow_nested_start_offset);
+ }
+ return nested_serde->read_column_from_arrow(
+ column_array.get_data(), concrete_array->values().get(),
arrow_nested_start_offset,
+ arrow_nested_end_offset, ctz);
+}
+
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h
b/be/src/vec/data_types/serde/data_type_array_serde.h
index cf28e33728..e8d08bbf10 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_array_serde.h
@@ -51,6 +51,12 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+ void write_column_to_arrow(const IColumn& column, const UInt8*
null_bytemap,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
+
private:
DataTypeSerDeSPtr nested_serde;
};
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
index 23a2a689d9..dc3dd8f096 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
@@ -41,6 +41,15 @@ public:
int32_t col_id, int row_num) const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+ void write_column_to_arrow(const IColumn& column, const UInt8*
null_bytemap,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override {
+ LOG(FATAL) << "Not support write bitmap column to arrow";
+ }
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override {
+ LOG(FATAL) << "Not support read bitmap column from arrow";
+ }
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
new file mode 100644
index 0000000000..1f6a3766f5
--- /dev/null
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "data_type_date64_serde.h"
+
+#include <arrow/builder.h>
+
+#include <type_traits>
+
+#include "gutil/casts.h"
+
+namespace doris {
+namespace vectorized {
+
+void DataTypeDate64SerDe::write_column_to_arrow(const IColumn& column, const
UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ auto& col_data = static_cast<const
ColumnVector<Int64>&>(column).get_data();
+ auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+ for (size_t i = start; i < end; ++i) {
+ char buf[64];
+ const vectorized::VecDateTimeValue* time_val =
+ (const vectorized::VecDateTimeValue*)(&col_data[i]);
+ int len = time_val->to_buffer(buf);
+ if (null_map && null_map[i]) {
+ checkArrowStatus(string_builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ } else {
+ checkArrowStatus(string_builder.Append(buf, len),
column.get_name(),
+ array_builder->type()->name());
+ }
+ }
+}
+
+static int64_t time_unit_divisor(arrow::TimeUnit::type unit) {
+ // Doris only supports seconds
+ switch (unit) {
+ case arrow::TimeUnit::type::SECOND: {
+ return 1L;
+ }
+ case arrow::TimeUnit::type::MILLI: {
+ return 1000L;
+ }
+ case arrow::TimeUnit::type::MICRO: {
+ return 1000000L;
+ }
+ case arrow::TimeUnit::type::NANO: {
+ return 1000000000L;
+ }
+ default:
+ return 0L;
+ }
+}
+
+void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
+ int64_t divisor = 1;
+ int64_t multiplier = 1;
+ if (arrow_array->type()->id() == arrow::Type::DATE64) {
+ auto concrete_array = down_cast<const
arrow::Date64Array*>(arrow_array);
+ divisor = 1000; //ms => secs
+ for (size_t value_i = start; value_i < end; ++value_i) {
+ VecDateTimeValue v;
+ v.from_unixtime(
+ static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier, ctz);
+ col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
+ }
+ } else if (arrow_array->type()->id() == arrow::Type::TIMESTAMP) {
+ auto concrete_array = down_cast<const
arrow::TimestampArray*>(arrow_array);
+ const auto type =
std::static_pointer_cast<arrow::TimestampType>(arrow_array->type());
+ divisor = time_unit_divisor(type->unit());
+ if (divisor == 0L) {
+ LOG(FATAL) << "Invalid Time Type:" << type->name();
+ }
+ for (size_t value_i = start; value_i < end; ++value_i) {
+ VecDateTimeValue v;
+ v.from_unixtime(
+ static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier, ctz);
+ col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
+ }
+ } else if (arrow_array->type()->id() == arrow::Type::DATE32) {
+ auto concrete_array = down_cast<const
arrow::Date32Array*>(arrow_array);
+ multiplier = 24 * 60 * 60; // day => secs
+ for (size_t value_i = start; value_i < end; ++value_i) {
+ // std::cout << "serde : " <<
concrete_array->Value(value_i) << std::endl;
+ VecDateTimeValue v;
+ v.from_unixtime(
+ static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier, ctz);
+ v.cast_to_date();
+ col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
+ }
+ }
+}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp
b/be/src/vec/data_types/serde/data_type_date64_serde.h
similarity index 54%
copy from be/src/vec/data_types/serde/data_type_array_serde.cpp
copy to be/src/vec/data_types/serde/data_type_date64_serde.h
index 8ae9c61858..80a0a1518c 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -15,33 +15,38 @@
// specific language governing permissions and limitations
// under the License.
-#include "data_type_array_serde.h"
+#pragma once
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <ostream>
+#include <string>
+
+#include "common/status.h"
+#include "data_type_number_serde.h"
+#include "olap/olap_common.h"
#include "util/jsonb_document.h"
+#include "util/jsonb_writer.h"
#include "vec/columns/column.h"
+#include "vec/columns/column_vector.h"
#include "vec/common/string_ref.h"
+#include "vec/core/types.h"
namespace doris {
+class JsonbOutStream;
namespace vectorized {
class Arena;
-void DataTypeArraySerDe::write_one_cell_to_jsonb(const IColumn& column,
JsonbWriter& result,
- Arena* mem_pool, int32_t
col_id,
- int row_num) const {
- result.writeKey(col_id);
- const char* begin = nullptr;
- // maybe serialize_value_into_arena should move to here later.
- StringRef value = column.serialize_value_into_arena(row_num, *mem_pool,
begin);
- result.writeStartBinary();
- result.writeBinary(value.data, value.size);
- result.writeEndBinary();
-}
-
-void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& column, const
JsonbValue* arg) const {
- auto blob = static_cast<const JsonbBlobVal*>(arg);
- column.deserialize_and_insert_from_arena(blob->getBlob());
-}
-
+class DataTypeDate64SerDe : public DataTypeNumberSerDe<Int64> {
+ void write_column_to_arrow(const IColumn& column, const UInt8*
null_bytemap,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
+};
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
new file mode 100644
index 0000000000..a8476197dd
--- /dev/null
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "data_type_datetimev2_serde.h"
+
+#include <arrow/builder.h>
+
+#include <type_traits>
+
+#include "gutil/casts.h"
+
+namespace doris {
+namespace vectorized {
+
+void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column,
const UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ auto& col_data = static_cast<const
ColumnVector<UInt64>&>(column).get_data();
+ auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+ for (size_t i = start; i < end; ++i) {
+ char buf[64];
+ const vectorized::DateV2Value<vectorized::DateTimeV2ValueType>*
time_val =
+ (const
vectorized::DateV2Value<vectorized::DateTimeV2ValueType>*)(col_data[i]);
+ int len = time_val->to_buffer(buf);
+ if (null_map && null_map[i]) {
+ checkArrowStatus(string_builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ } else {
+ checkArrowStatus(string_builder.Append(buf, len),
column.get_name(),
+ array_builder->type()->name());
+ }
+ }
+}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
similarity index 53%
copy from be/src/vec/data_types/serde/data_type_array_serde.h
copy to be/src/vec/data_types/serde/data_type_datetimev2_serde.h
index cf28e33728..7302a1d4d5 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h
@@ -17,42 +17,38 @@
#pragma once
+#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
+#include <stddef.h>
#include <stdint.h>
#include <ostream>
+#include <string>
#include "common/status.h"
-#include "data_type_serde.h"
+#include "data_type_number_serde.h"
+#include "olap/olap_common.h"
+#include "util/jsonb_document.h"
#include "util/jsonb_writer.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_vector.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/types.h"
namespace doris {
-class PValues;
-class JsonbValue;
+class JsonbOutStream;
namespace vectorized {
-class IColumn;
class Arena;
-class DataTypeArraySerDe : public DataTypeSerDe {
-public:
- DataTypeArraySerDe(const DataTypeSerDeSPtr& _nested_serde) :
nested_serde(_nested_serde) {}
-
- Status write_column_to_pb(const IColumn& column, PValues& result, int
start,
- int end) const override {
- LOG(FATAL) << "Not support write array column to pb";
- }
- Status read_column_from_pb(IColumn& column, const PValues& arg) const
override {
- LOG(FATAL) << "Not support read from pb to array";
+class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<UInt64> {
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override {
+ LOG(FATAL) << "not support read arrow array to uint64 column";
}
-
- void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result,
Arena* mem_pool,
- int32_t col_id, int row_num) const override;
-
- void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
-
-private:
- DataTypeSerDeSPtr nested_serde;
};
} // namespace vectorized
-} // namespace doris
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
new file mode 100644
index 0000000000..4095c8d872
--- /dev/null
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "data_type_datev2_serde.h"
+
+#include <arrow/builder.h>
+
+#include <type_traits>
+
+#include "gutil/casts.h"
+
+namespace doris {
+namespace vectorized {
+
+void DataTypeDateV2SerDe::write_column_to_arrow(const IColumn& column, const
UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ auto& col_data = static_cast<const
ColumnVector<UInt32>&>(column).get_data();
+ auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+ for (size_t i = start; i < end; ++i) {
+ char buf[64];
+ const vectorized::DateV2Value<vectorized::DateV2ValueType>* time_val =
+ (const
vectorized::DateV2Value<vectorized::DateV2ValueType>*)(&col_data[i]);
+ int len = time_val->to_buffer(buf);
+ if (null_map && null_map[i]) {
+ checkArrowStatus(string_builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ } else {
+ checkArrowStatus(string_builder.Append(buf, len),
column.get_name(),
+ array_builder->type()->name());
+ }
+ }
+}
+
+void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ std::cout << "column : " << column.get_name() << " data" <<
getTypeName(column.get_data_type())
+ << " array " << arrow_array->type_id() << std::endl;
+ auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
+ auto concrete_array = down_cast<const arrow::Date64Array*>(arrow_array);
+ int64_t divisor = 1;
+ int64_t multiplier = 1;
+
+ multiplier = 24 * 60 * 60; // day => secs
+ for (size_t value_i = start; value_i < end; ++value_i) {
+ DateV2Value<DateV2ValueType> v;
+ v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier,
+ ctz);
+ col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>,
UInt32>(v));
+ }
+}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp
b/be/src/vec/data_types/serde/data_type_datev2_serde.h
similarity index 54%
copy from be/src/vec/data_types/serde/data_type_array_serde.cpp
copy to be/src/vec/data_types/serde/data_type_datev2_serde.h
index 8ae9c61858..587f2be0c2 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.h
@@ -15,33 +15,38 @@
// specific language governing permissions and limitations
// under the License.
-#include "data_type_array_serde.h"
+#pragma once
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <ostream>
+#include <string>
+
+#include "common/status.h"
+#include "data_type_number_serde.h"
+#include "olap/olap_common.h"
#include "util/jsonb_document.h"
+#include "util/jsonb_writer.h"
#include "vec/columns/column.h"
+#include "vec/columns/column_vector.h"
#include "vec/common/string_ref.h"
+#include "vec/core/types.h"
namespace doris {
+class JsonbOutStream;
namespace vectorized {
class Arena;
-void DataTypeArraySerDe::write_one_cell_to_jsonb(const IColumn& column,
JsonbWriter& result,
- Arena* mem_pool, int32_t
col_id,
- int row_num) const {
- result.writeKey(col_id);
- const char* begin = nullptr;
- // maybe serialize_value_into_arena should move to here later.
- StringRef value = column.serialize_value_into_arena(row_num, *mem_pool,
begin);
- result.writeStartBinary();
- result.writeBinary(value.data, value.size);
- result.writeEndBinary();
-}
-
-void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& column, const
JsonbValue* arg) const {
- auto blob = static_cast<const JsonbBlobVal*>(arg);
- column.deserialize_and_insert_from_arena(blob->getBlob());
-}
-
+class DataTypeDateV2SerDe : public DataTypeNumberSerDe<UInt32> {
+ void write_column_to_arrow(const IColumn& column, const UInt8*
null_bytemap,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
+};
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index 6e6fa5f4d3..893e10769a 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -16,7 +16,138 @@
// under the License.
#include "data_type_decimal_serde.h"
+
+#include <arrow/array/array_base.h>
+#include <arrow/array/array_decimal.h>
+#include <arrow/builder.h>
+#include <arrow/util/decimal.h>
+
+#include "arrow/type.h"
+#include "gutil/casts.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/common/arithmetic_overflow.h"
+
namespace doris {
-namespace vectorized {} // namespace vectorized
+namespace vectorized {
+
+template <typename T>
+void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column,
const UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column);
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
+ if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+ std::shared_ptr<arrow::DataType> s_decimal_ptr =
+ std::make_shared<arrow::Decimal128Type>(27, 9);
+ for (size_t i = start; i < end; ++i) {
+ if (null_map && null_map[i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ continue;
+ }
+ const auto& data_ref = col.get_data_at(i);
+ const PackedInt128* p_value = reinterpret_cast<const
PackedInt128*>(data_ref.data);
+ int64_t high = (p_value->value) >> 64;
+ uint64 low = p_value->value;
+ arrow::Decimal128 value(high, low);
+ checkArrowStatus(builder.Append(value), column.get_name(),
+ array_builder->type()->name());
+ }
+ } else if constexpr (std::is_same_v<T, Decimal<Int128I>>) {
+ std::shared_ptr<arrow::DataType> s_decimal_ptr =
+ std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
+ for (size_t i = start; i < end; ++i) {
+ if (null_map && null_map[i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ continue;
+ }
+ const auto& data_ref = col.get_data_at(i);
+ const PackedInt128* p_value = reinterpret_cast<const
PackedInt128*>(data_ref.data);
+ int64_t high = (p_value->value) >> 64;
+ uint64 low = p_value->value;
+ arrow::Decimal128 value(high, low);
+ checkArrowStatus(builder.Append(value), column.get_name(),
+ array_builder->type()->name());
+ }
+ } else if constexpr (std::is_same_v<T, Decimal<Int32>>) {
+ std::shared_ptr<arrow::DataType> s_decimal_ptr =
+ std::make_shared<arrow::Decimal128Type>(8, col.get_scale());
+ for (size_t i = start; i < end; ++i) {
+ if (null_map && null_map[i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ continue;
+ }
+ const auto& data_ref = col.get_data_at(i);
+ const int32_t* p_value = reinterpret_cast<const
int32_t*>(data_ref.data);
+ int64_t high = *p_value > 0 ? 0 : 1UL << 63;
+ arrow::Decimal128 value(high, *p_value > 0 ? *p_value : -*p_value);
+ checkArrowStatus(builder.Append(value), column.get_name(),
+ array_builder->type()->name());
+ }
+ } else if constexpr (std::is_same_v<T, Decimal<Int64>>) {
+ std::shared_ptr<arrow::DataType> s_decimal_ptr =
+ std::make_shared<arrow::Decimal128Type>(18, col.get_scale());
+ for (size_t i = start; i < end; ++i) {
+ if (null_map && null_map[i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ continue;
+ }
+ const auto& data_ref = col.get_data_at(i);
+ const int64_t* p_value = reinterpret_cast<const
int64_t*>(data_ref.data);
+ int64_t high = *p_value > 0 ? 0 : 1UL << 63;
+ arrow::Decimal128 value(high, *p_value > 0 ? *p_value : -*p_value);
+ checkArrowStatus(builder.Append(value), column.get_name(),
+ array_builder->type()->name());
+ }
+ } else {
+ LOG(FATAL) << "Not support write " << column.get_name() << " to arrow";
+ }
+}
+
+template <typename T>
+void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array, int start,
+ int end, const
cctz::time_zone& ctz) const {
+ if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+ auto& column_data =
static_cast<ColumnDecimal<vectorized::Decimal128>&>(column).get_data();
+ auto concrete_array = down_cast<const
arrow::DecimalArray*>(arrow_array);
+ const auto* arrow_decimal_type =
+ static_cast<const
arrow::DecimalType*>(arrow_array->type().get());
+ // TODO check precision
+ const auto scale = arrow_decimal_type->scale();
+ for (size_t value_i = start; value_i < end; ++value_i) {
+ auto value = *reinterpret_cast<const vectorized::Decimal128*>(
+ concrete_array->Value(value_i));
+ // convert scale to 9;
+ if (9 > scale) {
+ using MaxNativeType = typename Decimal128::NativeType;
+ MaxNativeType converted_value = common::exp10_i128(9 - scale);
+ if (common::mul_overflow(static_cast<MaxNativeType>(value),
converted_value,
+ converted_value)) {
+ VLOG_DEBUG << "Decimal convert overflow";
+ value = converted_value < 0
+ ? std::numeric_limits<typename Decimal128
::NativeType>::min()
+ : std::numeric_limits<typename Decimal128
::NativeType>::max();
+ } else {
+ value = converted_value;
+ }
+ } else if (9 < scale) {
+ value = value / common::exp10_i128(scale - 9);
+ }
+ column_data.emplace_back(value);
+ }
+ } else {
+ LOG(FATAL) << "Not support read " << column.get_name() << " from
arrow";
+ }
+}
+
+template class DataTypeDecimalSerDe<Decimal32>;
+template class DataTypeDecimalSerDe<Decimal64>;
+template class DataTypeDecimalSerDe<Decimal128>;
+template class DataTypeDecimalSerDe<Decimal128I>;
+} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h
b/be/src/vec/data_types/serde/data_type_decimal_serde.h
index 79293e9dd8..0e9685d936 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.h
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h
@@ -54,6 +54,12 @@ public:
int32_t col_id, int row_num) const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
};
template <typename T>
diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
index 7d5db6c041..af02dc9824 100644
--- a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
+++ b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h
@@ -51,6 +51,15 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override {
LOG(FATAL) << "Not support read from jsonb to FixedLengthObject";
}
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override {
+ LOG(FATAL) << "Not support write FixedLengthObject column to arrow";
+ }
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override {
+ LOG(FATAL) << "Not support read FixedLengthObject column from arrow";
+ }
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
index b7e1afb3a7..a3fc40aebc 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
@@ -23,6 +23,7 @@
#include <string>
+#include "arrow/array/builder_binary.h"
#include "olap/hll.h"
#include "util/jsonb_document.h"
#include "util/slice.h"
@@ -79,5 +80,25 @@ void DataTypeHLLSerDe::read_one_cell_from_jsonb(IColumn&
column, const JsonbValu
col.insert_value(hyper_log_log);
}
+void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const
UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ const auto& col = assert_cast<const ColumnHLL&>(column);
+ auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+ for (size_t string_i = start; string_i < end; ++string_i) {
+ if (null_map && null_map[string_i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ } else {
+ auto& hll_value =
const_cast<HyperLogLog&>(col.get_element(string_i));
+ std::string memory_buffer(hll_value.max_serialized_size(), '0');
+ hll_value.serialize((uint8_t*)memory_buffer.data());
+ checkArrowStatus(
+ builder.Append(memory_buffer.data(),
static_cast<int>(memory_buffer.size())),
+ column.get_name(), array_builder->type()->name());
+ }
+ }
+}
+
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h
b/be/src/vec/data_types/serde/data_type_hll_serde.h
index facd6aaa72..9a47f8fbd7 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.h
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.h
@@ -41,6 +41,13 @@ public:
int32_t col_id, int row_num) const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override {
+ LOG(FATAL) << "Not support read hll column from arrow";
+ }
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp
b/be/src/vec/data_types/serde/data_type_map_serde.cpp
index 9f430e3198..87dd4e5615 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp
@@ -40,5 +40,17 @@ void DataTypeMapSerDe::write_one_cell_to_jsonb(const
IColumn& column, JsonbWrite
result.writeBinary(value.data, value.size);
result.writeEndBinary();
}
+
+void DataTypeMapSerDe::write_column_to_arrow(const IColumn& column, const
UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ LOG(FATAL) << "Not support write " << column.get_name() << " to arrow";
+}
+
+void DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ LOG(FATAL) << "Not support read " << column.get_name() << " from arrow";
+}
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h
b/be/src/vec/data_types/serde/data_type_map_serde.h
index dfbe55cf19..4708e36afd 100644
--- a/be/src/vec/data_types/serde/data_type_map_serde.h
+++ b/be/src/vec/data_types/serde/data_type_map_serde.h
@@ -50,6 +50,11 @@ public:
int32_t col_id, int row_num) const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
private:
DataTypeSerDeSPtr key_serde;
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index 8072c785c9..0c7e72bfda 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -17,6 +17,7 @@
#include "data_type_nullable_serde.h"
+#include <arrow/array/array_base.h>
#include <gen_cpp/types.pb.h>
#include <algorithm>
@@ -94,5 +95,41 @@ void
DataTypeNullableSerDe::read_one_cell_from_jsonb(IColumn& column, const Json
auto& null_map_data = col.get_null_map_data();
null_map_data.push_back(0);
}
+
+/**nullable serialize to arrow
+ 1/ convert the null_map from doris to arrow null byte map
+ 2/ pass the arrow null byteamp to nested column , and call AppendValues
+**/
+void DataTypeNullableSerDe::write_column_to_arrow(const IColumn& column, const
UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ const auto& column_nullable = assert_cast<const ColumnNullable&>(column);
+ const PaddedPODArray<UInt8>& bytemap = column_nullable.get_null_map_data();
+ PaddedPODArray<UInt8> res;
+ if (column_nullable.has_null()) {
+ res.reserve(end - start);
+ for (size_t i = start; i < end; ++i) {
+ res.emplace_back(
+ !(bytemap)[i]); //Invert values since Arrow interprets 1
as a non-null value
+ }
+ }
+ const UInt8* arrow_null_bytemap_raw_ptr = res.empty() ? nullptr :
res.data();
+ nested_serde->write_column_to_arrow(column_nullable.get_nested_column(),
+ arrow_null_bytemap_raw_ptr,
array_builder, start, end);
+}
+
+void DataTypeNullableSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ auto& col = reinterpret_cast<ColumnNullable&>(column);
+ NullMap& map_data = col.get_null_map_data();
+ for (size_t i = start; i < end; ++i) {
+ auto is_null = arrow_array->IsNull(i);
+ map_data.emplace_back(is_null);
+ }
+ return nested_serde->read_column_from_arrow(col.get_nested_column(),
arrow_array, start, end,
+ ctz);
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h
b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 3975675715..7631ed90ef 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -43,6 +43,11 @@ public:
int32_t col_id, int row_num) const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
private:
DataTypeSerDeSPtr nested_serde;
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index f6667e8930..d8736d30a8 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -17,6 +17,114 @@
#include "data_type_number_serde.h"
+#include <arrow/builder.h>
+
+#include <type_traits>
+
+#include "gutil/casts.h"
+
namespace doris {
-namespace vectorized {} // namespace vectorized
+namespace vectorized {
+
+// Type map的基本结构
+template <typename Key, typename Value, typename... Rest>
+struct TypeMap {
+ using KeyType = Key;
+ using ValueType = Value;
+ using Next = TypeMap<Rest...>;
+};
+
+// Type map的末端
+template <>
+struct TypeMap<void, void> {};
+
+// TypeMapLookup 前向声明
+template <typename Key, typename Map>
+struct TypeMapLookup;
+
+// Type map查找:找到匹配的键时的情况
+template <typename Key, typename Value, typename... Rest>
+struct TypeMapLookup<Key, TypeMap<Key, Value, Rest...>> {
+ using ValueType = Value;
+};
+
+// Type map查找:递归查找
+template <typename Key, typename K, typename V, typename... Rest>
+struct TypeMapLookup<Key, TypeMap<K, V, Rest...>> {
+ using ValueType = typename TypeMapLookup<Key, TypeMap<Rest...>>::ValueType;
+};
+
+using DORIS_NUMERIC_ARROW_BUILDER =
+ TypeMap<UInt8, arrow::BooleanBuilder, Int8, arrow::Int8Builder, UInt16,
+ arrow::UInt16Builder, Int16, arrow::Int16Builder, UInt32,
arrow::UInt32Builder,
+ Int32, arrow::Int32Builder, UInt64, arrow::UInt64Builder,
Int64,
+ arrow::Int64Builder, UInt128, arrow::FixedSizeBinaryBuilder,
Int128,
+ arrow::FixedSizeBinaryBuilder, Float32, arrow::FloatBuilder,
Float64,
+ arrow::DoubleBuilder, void,
+ void // 添加这一行来表示TypeMap的末端
+ >;
+
+template <typename T>
+void DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column,
const UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ auto& col_data = assert_cast<const ColumnType&>(column).get_data();
+ using ARROW_BUILDER_TYPE = typename TypeMapLookup<T,
DORIS_NUMERIC_ARROW_BUILDER>::ValueType;
+ if constexpr (std::is_same_v<T, UInt8>) {
+ ARROW_BUILDER_TYPE& builder =
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
+ checkArrowStatus(
+ builder.AppendValues(reinterpret_cast<const
uint8_t*>(col_data.data() + start),
+ end - start, reinterpret_cast<const
uint8_t*>(null_map)),
+ column.get_name(), array_builder->type()->name());
+ } else if constexpr (std::is_same_v<T, Int128> || std::is_same_v<T,
UInt128>) {
+ ARROW_BUILDER_TYPE& builder =
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
+ size_t fixed_length = sizeof(typename ColumnType::value_type);
+ const uint8_t* data_start =
+ reinterpret_cast<const uint8_t*>(col_data.data()) + start *
fixed_length;
+ checkArrowStatus(builder.AppendValues(data_start, end - start,
+ reinterpret_cast<const
uint8_t*>(null_map)),
+ column.get_name(), array_builder->type()->name());
+ } else {
+ ARROW_BUILDER_TYPE& builder =
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
+ checkArrowStatus(builder.AppendValues(col_data.data() + start, end -
start,
+ reinterpret_cast<const
uint8_t*>(null_map)),
+ column.get_name(), array_builder->type()->name());
+ }
+}
+
+template <typename T>
+void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
+ const arrow::Array*
arrow_array, int start,
+ int end, const
cctz::time_zone& ctz) const {
+ int row_count = end - start;
+ auto& col_data = static_cast<ColumnVector<T>&>(column).get_data();
+
+ // now uint8 for bool
+ if constexpr (std::is_same_v<T, UInt8>) {
+ auto concrete_array = down_cast<const
arrow::BooleanArray*>(arrow_array);
+ for (size_t bool_i = 0; bool_i !=
static_cast<size_t>(concrete_array->length()); ++bool_i) {
+ col_data.emplace_back(concrete_array->Value(bool_i));
+ }
+ return;
+ }
+ /// buffers[0] is a null bitmap and buffers[1] are actual values
+ std::shared_ptr<arrow::Buffer> buffer = arrow_array->data()->buffers[1];
+ const auto* raw_data = reinterpret_cast<const T*>(buffer->data()) + start;
+ col_data.insert(raw_data, raw_data + row_count);
+}
+
+/// Explicit template instantiations - to avoid code bloat in headers.
+template class DataTypeNumberSerDe<UInt8>;
+template class DataTypeNumberSerDe<UInt16>;
+template class DataTypeNumberSerDe<UInt32>;
+template class DataTypeNumberSerDe<UInt64>;
+template class DataTypeNumberSerDe<UInt128>;
+template class DataTypeNumberSerDe<Int8>;
+template class DataTypeNumberSerDe<Int16>;
+template class DataTypeNumberSerDe<Int32>;
+template class DataTypeNumberSerDe<Int64>;
+template class DataTypeNumberSerDe<Int128>;
+template class DataTypeNumberSerDe<Float32>;
+template class DataTypeNumberSerDe<Float64>;
+} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h
b/be/src/vec/data_types/serde/data_type_number_serde.h
index 62632b946d..90566510ca 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.h
+++ b/be/src/vec/data_types/serde/data_type_number_serde.h
@@ -41,6 +41,12 @@ class JsonbOutStream;
namespace vectorized {
class Arena;
+// special data type using, maybe has various serde actions, so use specific
date serde
+// DataTypeDateV2 => T:UInt32
+// DataTypeDateTimeV2 => T:UInt64
+// DataTypeTime => T:Float64
+// DataTypeDate => T:Int64
+// DataTypeDateTime => T:Int64
template <typename T>
class DataTypeNumberSerDe : public DataTypeSerDe {
static_assert(IsNumber<T>);
@@ -55,6 +61,12 @@ public:
int32_t col_id, int row_num) const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
};
template <typename T>
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h
b/be/src/vec/data_types/serde/data_type_object_serde.h
index c72deb59ea..d8c93b9afe 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -51,6 +51,16 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override {
LOG(FATAL) << "Not support write json object to column";
}
+
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override {
+ LOG(FATAL) << "Not support write object column to arrow";
+ }
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override {
+ LOG(FATAL) << "Not support read object column from arrow";
+ }
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
index bf5912446b..dc2031c5ef 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
@@ -47,6 +47,15 @@ public:
int32_t col_id, int row_num) const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override {
+ LOG(FATAL) << "Not support write " << column.get_name() << " to arrow";
+ }
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override {
+ LOG(FATAL) << "Not support read " << column.get_name() << " from
arrow";
+ }
};
template <typename T>
diff --git a/be/src/vec/data_types/serde/data_type_serde.h
b/be/src/vec/data_types/serde/data_type_serde.h
index 5908c0836e..2d196fcb9e 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -22,8 +22,19 @@
#include <memory>
#include <vector>
+#include "arrow/status.h"
#include "common/status.h"
#include "util/jsonb_writer.h"
+#include "vec/common/pod_array_fwd.h"
+#include "vec/core/types.h"
+
+namespace arrow {
+class ArrayBuilder;
+class Array;
+} // namespace arrow
+namespace cctz {
+class time_zone;
+} // namespace cctz
namespace doris {
class PValues;
@@ -70,8 +81,21 @@ public:
// JSON serializer and deserializer
// Arrow serializer and deserializer
+ virtual void write_column_to_arrow(const IColumn& column, const UInt8*
null_map,
+ arrow::ArrayBuilder* array_builder, int
start,
+ int end) const = 0;
+ virtual void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz)
const = 0;
};
+inline void checkArrowStatus(const arrow::Status& status, const std::string&
column,
+ const std::string& format_name) {
+ if (!status.ok()) {
+ LOG(FATAL) << "arrow serde with arrow: " << format_name << " with
column : " << column
+ << " with error msg: " << status.ToString();
+ }
+}
+
using DataTypeSerDeSPtr = std::shared_ptr<DataTypeSerDe>;
using DataTypeSerDeSPtrs = std::vector<DataTypeSerDeSPtr>;
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.cpp
b/be/src/vec/data_types/serde/data_type_string_serde.cpp
index 081b9b1748..ba230da987 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_string_serde.cpp
@@ -21,7 +21,10 @@
#include <gen_cpp/types.pb.h>
#include <stddef.h>
+#include "arrow/array/builder_binary.h"
+#include "gutil/casts.h"
#include "util/jsonb_document.h"
+#include "util/jsonb_utils.h"
#include "vec/columns/column.h"
#include "vec/columns/column_string.h"
#include "vec/common/string_ref.h"
@@ -63,5 +66,62 @@ void DataTypeStringSerDe::read_one_cell_from_jsonb(IColumn&
column, const JsonbV
auto blob = static_cast<const JsonbBlobVal*>(arg);
col.insert_data(blob->getBlob(), blob->getBlobLen());
}
+
+void DataTypeStringSerDe::write_column_to_arrow(const IColumn& column, const
UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ const auto& string_column = assert_cast<const ColumnString&>(column);
+ auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
+ for (size_t string_i = start; string_i < end; ++string_i) {
+ if (null_map && null_map[string_i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ continue;
+ }
+ std::string_view string_ref =
string_column.get_data_at(string_i).to_string_view();
+ if (column.get_data_type() == TypeIndex::JSONB) {
+ std::string json_string =
+ JsonbToJson::jsonb_to_json_string(string_ref.data(),
string_ref.size());
+ checkArrowStatus(builder.Append(json_string.data(),
json_string.size()),
+ column.get_name(), array_builder->type()->name());
+ } else {
+ checkArrowStatus(builder.Append(string_ref.data(),
string_ref.size()),
+ column.get_name(), array_builder->type()->name());
+ }
+ }
+}
+
+void DataTypeStringSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ auto& column_chars_t = assert_cast<ColumnString&>(column).get_chars();
+ auto& column_offsets = assert_cast<ColumnString&>(column).get_offsets();
+ if (arrow_array->type_id() == arrow::Type::STRING ||
+ arrow_array->type_id() == arrow::Type::BINARY) {
+ auto concrete_array = down_cast<const
arrow::BinaryArray*>(arrow_array);
+ std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
+
+ for (size_t offset_i = start; offset_i < end; ++offset_i) {
+ if (!concrete_array->IsNull(offset_i)) {
+ const auto* raw_data = buffer->data() +
concrete_array->value_offset(offset_i);
+ column_chars_t.insert(raw_data, raw_data +
concrete_array->value_length(offset_i));
+ }
+ column_offsets.emplace_back(column_chars_t.size());
+ }
+ } else if (arrow_array->type_id() == arrow::Type::FIXED_SIZE_BINARY) {
+ auto concrete_array = down_cast<const
arrow::FixedSizeBinaryArray*>(arrow_array);
+ uint32_t width = concrete_array->byte_width();
+ const auto* array_data = concrete_array->GetValue(start);
+
+ for (size_t offset_i = 0; offset_i < end - start; ++offset_i) {
+ if (!concrete_array->IsNull(offset_i)) {
+ const auto* raw_data = array_data + (offset_i * width);
+ column_chars_t.insert(raw_data, raw_data + width);
+ }
+ column_offsets.emplace_back(column_chars_t.size());
+ }
+ }
+}
+
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h
b/be/src/vec/data_types/serde/data_type_string_serde.h
index 9893c5721e..5fe7d00db8 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.h
+++ b/be/src/vec/data_types/serde/data_type_string_serde.h
@@ -40,6 +40,12 @@ public:
int32_t col_id, int row_num) const override;
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp
b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
index 2af87dd484..be21680009 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp
@@ -42,5 +42,17 @@ void DataTypeStructSerDe::read_one_cell_from_jsonb(IColumn&
column, const JsonbV
auto blob = static_cast<const JsonbBlobVal*>(arg);
column.deserialize_and_insert_from_arena(blob->getBlob());
}
+
+void DataTypeStructSerDe::write_column_to_arrow(const IColumn& column, const
UInt8* null_map,
+ arrow::ArrayBuilder*
array_builder, int start,
+ int end) const {
+ LOG(FATAL) << "Not support write " << column.get_name() << " to arrow";
+}
+
+void DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ LOG(FATAL) << "Not support read " << column.get_name() << " from arrow";
+}
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h
b/be/src/vec/data_types/serde/data_type_struct_serde.h
index e6abe47b7d..836d5bdbdd 100644
--- a/be/src/vec/data_types/serde/data_type_struct_serde.h
+++ b/be/src/vec/data_types/serde/data_type_struct_serde.h
@@ -51,6 +51,12 @@ public:
void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg)
const override;
+ void write_column_to_arrow(const IColumn& column, const UInt8* null_map,
+ arrow::ArrayBuilder* array_builder, int start,
+ int end) const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
+
private:
DataTypeSerDeSPtrs elemSerDeSPtrs;
};
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp
b/be/src/vec/utils/arrow_column_to_doris_column.cpp
index af94664f1b..5de57156dd 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.cpp
+++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp
@@ -71,19 +71,6 @@
M(::arrow::Type::DATE64, TYPE_DATETIME) \
M(::arrow::Type::DECIMAL, TYPE_DECIMALV2)
-#define FOR_ARROW_NUMERIC_TYPES(M) \
- M(arrow::Type::UINT8, UInt8) \
- M(arrow::Type::INT8, Int8) \
- M(arrow::Type::INT16, Int16) \
- M(arrow::Type::UINT16, UInt16) \
- M(arrow::Type::INT32, Int32) \
- M(arrow::Type::UINT32, UInt32) \
- M(arrow::Type::UINT64, UInt64) \
- M(arrow::Type::INT64, Int64) \
- M(arrow::Type::HALF_FLOAT, Float32) \
- M(arrow::Type::FLOAT, Float32) \
- M(arrow::Type::DOUBLE, Float64)
-
namespace doris::vectorized {
PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type) {
@@ -100,254 +87,7 @@ PrimitiveType
arrow_type_to_primitive_type(::arrow::Type::type type) {
return INVALID_TYPE;
}
-static size_t fill_nullable_column(const arrow::Array* array, size_t array_idx,
- vectorized::ColumnNullable* nullable_column,
- size_t num_elements) {
- size_t null_elements_count = 0;
- NullMap& map_data = nullable_column->get_null_map_data();
- for (size_t i = 0; i < num_elements; ++i) {
- auto is_null = array->IsNull(array_idx + i);
- map_data.emplace_back(is_null);
- null_elements_count += is_null;
- }
- return null_elements_count;
-}
-
-/// Inserts chars and offsets right into internal column data to reduce an
overhead.
-/// Internal offsets are shifted by one to the right in comparison with Arrow
ones. So the last offset should map to the end of all chars.
-/// Also internal strings are null terminated.
-static Status convert_column_with_string_data(const arrow::Array* array,
size_t array_idx,
- MutableColumnPtr& data_column,
size_t num_elements) {
- auto& column_chars_t =
assert_cast<ColumnString&>(*data_column).get_chars();
- auto& column_offsets =
assert_cast<ColumnString&>(*data_column).get_offsets();
-
- auto concrete_array = down_cast<const arrow::BinaryArray*>(array);
- std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();
-
- for (size_t offset_i = array_idx; offset_i < array_idx + num_elements;
++offset_i) {
- if (!concrete_array->IsNull(offset_i) && buffer) {
- const auto* raw_data = buffer->data() +
concrete_array->value_offset(offset_i);
- column_chars_t.insert(raw_data, raw_data +
concrete_array->value_length(offset_i));
- }
-
- column_offsets.emplace_back(column_chars_t.size());
- }
- return Status::OK();
-}
-
-static Status convert_column_with_fixed_size_data(const arrow::Array* array,
size_t array_idx,
- MutableColumnPtr&
data_column,
- size_t num_elements) {
- auto& column_chars_t =
assert_cast<ColumnString&>(*data_column).get_chars();
- auto& column_offsets =
assert_cast<ColumnString&>(*data_column).get_offsets();
-
- auto concrete_array = down_cast<const arrow::FixedSizeBinaryArray*>(array);
- uint32_t width = concrete_array->byte_width();
- const auto* array_data = concrete_array->GetValue(array_idx);
-
- for (size_t offset_i = 0; offset_i < num_elements; ++offset_i) {
- if (!concrete_array->IsNull(offset_i)) {
- const auto* raw_data = array_data + (offset_i * width);
- column_chars_t.insert(raw_data, raw_data + width);
- }
- column_offsets.emplace_back(column_chars_t.size());
- }
- return Status::OK();
-}
-
-/// Inserts numeric data right into internal column data to reduce an overhead
-template <typename NumericType, typename VectorType =
ColumnVector<NumericType>>
-Status convert_column_with_numeric_data(const arrow::Array* array, size_t
array_idx,
- MutableColumnPtr& data_column, size_t
num_elements) {
- auto& column_data = static_cast<VectorType&>(*data_column).get_data();
- /// buffers[0] is a null bitmap and buffers[1] are actual values
- std::shared_ptr<arrow::Buffer> buffer = array->data()->buffers[1];
- const auto* raw_data = reinterpret_cast<const
NumericType*>(buffer->data()) + array_idx;
- column_data.insert(raw_data, raw_data + num_elements);
- return Status::OK();
-}
-
-static Status convert_column_with_boolean_data(const arrow::Array* array,
size_t array_idx,
- MutableColumnPtr& data_column,
size_t num_elements) {
- auto& column_data =
static_cast<ColumnVector<UInt8>&>(*data_column).get_data();
- auto concrete_array = down_cast<const arrow::BooleanArray*>(array);
- for (size_t bool_i = array_idx; bool_i < array_idx + num_elements;
++bool_i) {
- column_data.emplace_back(concrete_array->Value(bool_i));
- }
- return Status::OK();
-}
-
-static int64_t time_unit_divisor(arrow::TimeUnit::type unit) {
- // Doris only supports seconds
- switch (unit) {
- case arrow::TimeUnit::type::SECOND: {
- return 1L;
- }
- case arrow::TimeUnit::type::MILLI: {
- return 1000L;
- }
- case arrow::TimeUnit::type::MICRO: {
- return 1000000L;
- }
- case arrow::TimeUnit::type::NANO: {
- return 1000000000L;
- }
- default:
- return 0L;
- }
-}
-
-template <typename ArrowType>
-Status convert_column_with_timestamp_data(const arrow::Array* array, size_t
array_idx,
- MutableColumnPtr& data_column,
size_t num_elements,
- const cctz::time_zone& ctz) {
- auto& column_data =
static_cast<ColumnVector<Int64>&>(*data_column).get_data();
- auto concrete_array = down_cast<const ArrowType*>(array);
- int64_t divisor = 1;
- int64_t multiplier = 1;
- if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) {
- const auto type =
std::static_pointer_cast<arrow::TimestampType>(array->type());
- divisor = time_unit_divisor(type->unit());
- if (divisor == 0L) {
- return Status::InternalError(fmt::format("Invalid Time Type:{}",
type->name()));
- }
- } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
- multiplier = 24 * 60 * 60; // day => secs
- } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) {
- divisor = 1000; //ms => secs
- }
-
- for (size_t value_i = array_idx; value_i < array_idx + num_elements;
++value_i) {
- VecDateTimeValue v;
- v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier,
- ctz);
- if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
- v.cast_to_date();
- }
- column_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
- }
- return Status::OK();
-}
-
-template <typename ArrowType>
-Status convert_column_with_date_v2_data(const arrow::Array* array, size_t
array_idx,
- MutableColumnPtr& data_column, size_t
num_elements,
- const cctz::time_zone& ctz) {
- auto& column_data =
static_cast<ColumnVector<UInt32>&>(*data_column).get_data();
- auto concrete_array = down_cast<const ArrowType*>(array);
- int64_t divisor = 1;
- int64_t multiplier = 1;
- if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) {
- const auto type =
std::static_pointer_cast<arrow::TimestampType>(array->type());
- divisor = time_unit_divisor(type->unit());
- if (divisor == 0L) {
- return Status::InternalError(fmt::format("Invalid Time Type:{}",
type->name()));
- }
- } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
- multiplier = 24 * 60 * 60; // day => secs
- } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) {
- divisor = 1000; //ms => secs
- }
-
- for (size_t value_i = array_idx; value_i < array_idx + num_elements;
++value_i) {
- DateV2Value<DateV2ValueType> v;
- v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier,
- ctz);
- column_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>,
UInt32>(v));
- }
- return Status::OK();
-}
-
-template <typename ArrowType>
-Status convert_column_with_datetime_v2_data(const arrow::Array* array, size_t
array_idx,
- MutableColumnPtr& data_column,
size_t num_elements,
- const cctz::time_zone& ctz) {
- auto& column_data =
static_cast<ColumnVector<UInt64>&>(*data_column).get_data();
- auto concrete_array = down_cast<const ArrowType*>(array);
- int64_t divisor = 1;
- int64_t multiplier = 1;
- if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) {
- const auto type =
std::static_pointer_cast<arrow::TimestampType>(array->type());
- divisor = time_unit_divisor(type->unit());
- if (divisor == 0L) {
- return Status::InternalError(fmt::format("Invalid Time Type:{}",
type->name()));
- }
- } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
- multiplier = 24 * 60 * 60; // day => secs
- } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) {
- divisor = 1000; //ms => secs
- }
-
- for (size_t value_i = array_idx; value_i < array_idx + num_elements;
++value_i) {
- DateV2Value<DateTimeV2ValueType> v;
- v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier,
- ctz);
- column_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>,
UInt64>(v));
- }
- return Status::OK();
-}
-
-static Status convert_column_with_decimal_data(const arrow::Array* array,
size_t array_idx,
- MutableColumnPtr& data_column,
size_t num_elements) {
- auto& column_data =
-
static_cast<ColumnDecimal<vectorized::Decimal128>&>(*data_column).get_data();
- auto concrete_array = down_cast<const arrow::DecimalArray*>(array);
- const auto* arrow_decimal_type =
static_cast<arrow::DecimalType*>(array->type().get());
- // TODO check precision
- //size_t precision = arrow_decimal_type->precision();
- const auto scale = arrow_decimal_type->scale();
-
- for (size_t value_i = array_idx; value_i < array_idx + num_elements;
++value_i) {
- auto value =
- *reinterpret_cast<const
vectorized::Decimal128*>(concrete_array->Value(value_i));
- // convert scale to 9
- if (scale != 9) {
- value =
convert_decimals<vectorized::DataTypeDecimal<vectorized::Decimal128>,
-
vectorized::DataTypeDecimal<vectorized::Decimal128>>(value,
-
scale, 9);
- }
- column_data.emplace_back(value);
- }
- return Status::OK();
-}
-
-static Status convert_offset_from_list_column(const arrow::Array* array,
size_t array_idx,
- MutableColumnPtr& data_column,
size_t num_elements,
- size_t* start_idx_for_data,
size_t* num_for_data) {
- auto& offsets_data = static_cast<ColumnArray&>(*data_column).get_offsets();
- auto concrete_array = down_cast<const arrow::ListArray*>(array);
- auto arrow_offsets_array = concrete_array->offsets();
- auto arrow_offsets =
down_cast<arrow::Int32Array*>(arrow_offsets_array.get());
- auto prev_size = offsets_data.back();
- for (int64_t i = array_idx + 1; i < array_idx + num_elements + 1; ++i) {
- // convert to doris offset, start from offsets.back()
- offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) -
- arrow_offsets->Value(array_idx));
- }
- *start_idx_for_data = arrow_offsets->Value(array_idx);
- *num_for_data = offsets_data.back() - prev_size;
-
- return Status::OK();
-}
-
-static Status convert_column_with_list_data(const arrow::Array* array, size_t
array_idx,
- MutableColumnPtr& data_column,
size_t num_elements,
- const cctz::time_zone& ctz,
- const DataTypePtr& nested_type) {
- size_t start_idx_of_data = 0;
- size_t num_of_data = 0;
- // get start idx and num of values from arrow offsets
- RETURN_IF_ERROR(convert_offset_from_list_column(array, array_idx,
data_column, num_elements,
- &start_idx_of_data,
&num_of_data));
- auto& data_column_ptr =
static_cast<ColumnArray&>(*data_column).get_data_ptr();
- auto concrete_array = down_cast<const arrow::ListArray*>(array);
- std::shared_ptr<arrow::Array> arrow_data = concrete_array->values();
-
- return arrow_column_to_doris_column(arrow_data.get(), start_idx_of_data,
data_column_ptr,
- nested_type, num_of_data, ctz);
-}
-
-// For convenient unit test. Not use this in formal code.
+//// For convenient unit test. Not use this in formal code.
Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t
arrow_batch_cur_idx,
ColumnPtr& doris_column, const
DataTypePtr& type,
size_t num_elements, const std::string&
timezone) {
@@ -360,69 +100,10 @@ Status arrow_column_to_doris_column(const arrow::Array*
arrow_column, size_t arr
Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t
arrow_batch_cur_idx,
ColumnPtr& doris_column, const
DataTypePtr& type,
size_t num_elements, const
cctz::time_zone& ctz) {
- // src column always be nullable for simplify converting
- CHECK(doris_column->is_nullable());
- MutableColumnPtr data_column = nullptr;
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(doris_column)).mutate().get());
- fill_nullable_column(arrow_column, arrow_batch_cur_idx, nullable_column,
num_elements);
- data_column = nullable_column->get_nested_column_ptr();
- WhichDataType which_type(remove_nullable(type));
- // process data
- switch (arrow_column->type()->id()) {
- case arrow::Type::STRING:
- case arrow::Type::BINARY:
- return convert_column_with_string_data(arrow_column,
arrow_batch_cur_idx, data_column,
- num_elements);
- case arrow::Type::FIXED_SIZE_BINARY:
- return convert_column_with_fixed_size_data(arrow_column,
arrow_batch_cur_idx, data_column,
- num_elements);
-#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
- case ARROW_NUMERIC_TYPE: \
- return convert_column_with_numeric_data<CPP_NUMERIC_TYPE>( \
- arrow_column, arrow_batch_cur_idx, data_column, num_elements);
- FOR_ARROW_NUMERIC_TYPES(DISPATCH)
-#undef DISPATCH
- case arrow::Type::BOOL:
- return convert_column_with_boolean_data(arrow_column,
arrow_batch_cur_idx, data_column,
- num_elements);
- case arrow::Type::DATE32:
- if (which_type.is_date_v2()) {
- return convert_column_with_date_v2_data<arrow::Date32Array>(
- arrow_column, arrow_batch_cur_idx, data_column,
num_elements, ctz);
- } else {
- return convert_column_with_timestamp_data<arrow::Date32Array>(
- arrow_column, arrow_batch_cur_idx, data_column,
num_elements, ctz);
- }
- case arrow::Type::DATE64:
- if (which_type.is_date_v2_or_datetime_v2()) {
- return convert_column_with_datetime_v2_data<arrow::Date64Array>(
- arrow_column, arrow_batch_cur_idx, data_column,
num_elements, ctz);
- } else {
- return convert_column_with_timestamp_data<arrow::Date64Array>(
- arrow_column, arrow_batch_cur_idx, data_column,
num_elements, ctz);
- }
- case arrow::Type::TIMESTAMP:
- if (which_type.is_date_v2_or_datetime_v2()) {
- return convert_column_with_datetime_v2_data<arrow::TimestampArray>(
- arrow_column, arrow_batch_cur_idx, data_column,
num_elements, ctz);
- } else {
- return convert_column_with_timestamp_data<arrow::TimestampArray>(
- arrow_column, arrow_batch_cur_idx, data_column,
num_elements, ctz);
- }
- case arrow::Type::DECIMAL:
- return convert_column_with_decimal_data(arrow_column,
arrow_batch_cur_idx, data_column,
- num_elements);
- case arrow::Type::LIST:
- CHECK(type->have_subtypes());
- return convert_column_with_list_data(
- arrow_column, arrow_batch_cur_idx, data_column, num_elements,
ctz,
- (reinterpret_cast<const
DataTypeArray*>(type.get()))->get_nested_type());
- default:
- break;
- }
- return Status::NotSupported(
- fmt::format("Not support arrow type:{}",
arrow_column->type()->name()));
+
type->get_serde()->read_column_from_arrow(doris_column->assume_mutable_ref(),
arrow_column,
+ arrow_batch_cur_idx,
+ arrow_batch_cur_idx +
num_elements, ctz);
+ return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index d543c48570..50cc5ad6c4 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -228,7 +228,8 @@ set(VEC_TEST_FILES
vec/columns/column_decimal_test.cpp
vec/columns/column_fixed_length_object_test.cpp
vec/data_types/complex_type_test.cpp
- vec/data_types/serde/data_type_serde_test.cpp
+ vec/data_types/serde/data_type_serde_pb_test.cpp
+ vec/data_types/serde/data_type_serde_arrow_test.cpp
vec/core/block_test.cpp
vec/core/block_spill_test.cpp
vec/core/column_array_test.cpp
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
new file mode 100644
index 0000000000..e7917e7cea
--- /dev/null
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -0,0 +1,321 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/array/builder_base.h>
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_decimal.h>
+#include <arrow/array/builder_nested.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/record_batch.h>
+#include <arrow/status.h>
+#include <arrow/type.h>
+#include <arrow/util/decimal.h>
+#include <arrow/visit_type_inline.h>
+#include <arrow/visitor.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/types.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <math.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <time.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest_pred_impl.h"
+#include "olap/hll.h"
+#include "runtime/descriptors.cpp"
+#include "runtime/descriptors.h"
+#include "util/arrow/block_convertor.h"
+#include "util/arrow/row_batch.h"
+#include "util/bitmap_value.h"
+#include "util/quantile_state.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/block.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_hll.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_time_v2.h"
+#include "vec/runtime/vdatetime_value.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+namespace doris::vectorized {
+
+void serialize_and_deserialize_arrow_test() {
+ vectorized::Block block;
+ std::vector<std::tuple<std::string, FieldType, int, PrimitiveType, bool>>
cols {
+ {"k1", FieldType::OLAP_FIELD_TYPE_INT, 1, TYPE_INT, false},
+ {"k7", FieldType::OLAP_FIELD_TYPE_INT, 7, TYPE_INT, true},
+ {"k2", FieldType::OLAP_FIELD_TYPE_STRING, 2, TYPE_STRING, false},
+ {"k3", FieldType::OLAP_FIELD_TYPE_DECIMAL128I, 3,
TYPE_DECIMAL128I, false},
+ {"k11", FieldType::OLAP_FIELD_TYPE_DATETIME, 11, TYPE_DATETIME,
false},
+ {"k4", FieldType::OLAP_FIELD_TYPE_BOOL, 4, TYPE_BOOLEAN, false}};
+ int row_num = 7;
+ // make desc and generate block
+ TupleDescriptor tuple_desc(PTupleDescriptor(), true);
+ for (auto t : cols) {
+ TSlotDescriptor tslot;
+ std::string col_name = std::get<0>(t);
+ tslot.__set_colName(col_name);
+ TypeDescriptor type_desc(std::get<3>(t));
+ bool is_nullable(std::get<4>(t));
+ switch (std::get<3>(t)) {
+ case TYPE_BOOLEAN:
+ tslot.__set_slotType(type_desc.to_thrift());
+ {
+ auto vec = vectorized::ColumnVector<UInt8>::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i % 2);
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeUInt8>());
+ vectorized::ColumnWithTypeAndName
type_and_name(vec->get_ptr(), data_type,
+ col_name);
+ block.insert(std::move(type_and_name));
+ }
+ break;
+ case TYPE_INT:
+ tslot.__set_slotType(type_desc.to_thrift());
+ if (is_nullable) {
+ {
+ auto column_vector_int32 =
vectorized::ColumnVector<Int32>::create();
+ auto column_nullable_vector =
+
vectorized::make_nullable(std::move(column_vector_int32));
+ auto mutable_nullable_vector =
std::move(*column_nullable_vector).mutate();
+ for (int i = 0; i < row_num; i++) {
+ mutable_nullable_vector->insert(int32(i));
+ }
+ auto data_type = vectorized::make_nullable(
+ std::make_shared<vectorized::DataTypeInt32>());
+ vectorized::ColumnWithTypeAndName type_and_name(
+ mutable_nullable_vector->get_ptr(), data_type,
col_name);
+ block.insert(type_and_name);
+ }
+ } else {
+ auto vec = vectorized::ColumnVector<Int32>::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i);
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeInt32>());
+ vectorized::ColumnWithTypeAndName
type_and_name(vec->get_ptr(), data_type,
+ col_name);
+ block.insert(std::move(type_and_name));
+ }
+ break;
+ case TYPE_DECIMAL128I:
+ type_desc.precision = 27;
+ type_desc.scale = 9;
+ tslot.__set_slotType(type_desc.to_thrift());
+ {
+ vectorized::DataTypePtr decimal_data_type(
+ doris::vectorized::create_decimal(27, 9, true));
+ auto decimal_column = decimal_data_type->create_column();
+ auto& data =
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)
+ decimal_column.get())
+ ->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ __int128_t value = i * pow(10, 9) + i * pow(10, 8);
+ data.push_back(value);
+ }
+ vectorized::ColumnWithTypeAndName
type_and_name(decimal_column->get_ptr(),
+
decimal_data_type, col_name);
+ block.insert(type_and_name);
+ }
+ break;
+ case TYPE_STRING:
+ tslot.__set_slotType(type_desc.to_thrift());
+ {
+ auto strcol = vectorized::ColumnString::create();
+ for (int i = 0; i < row_num; ++i) {
+ std::string is = std::to_string(i);
+ strcol->insert_data(is.c_str(), is.size());
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
+ vectorized::ColumnWithTypeAndName
type_and_name(strcol->get_ptr(), data_type,
+ col_name);
+ block.insert(type_and_name);
+ }
+ break;
+ case TYPE_HLL:
+ tslot.__set_slotType(type_desc.to_thrift());
+ {
+ vectorized::DataTypePtr
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
+ auto hll_column = hll_data_type->create_column();
+ std::vector<HyperLogLog>& container =
+ ((vectorized::ColumnHLL*)hll_column.get())->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ HyperLogLog hll;
+ hll.update(i);
+ container.push_back(hll);
+ }
+ vectorized::ColumnWithTypeAndName
type_and_name(hll_column->get_ptr(),
+ hll_data_type,
col_name);
+
+ block.insert(type_and_name);
+ }
+ break;
+ case TYPE_DATEV2:
+ tslot.__set_slotType(type_desc.to_thrift());
+ {
+ auto column_vector_date_v2 =
vectorized::ColumnVector<vectorized::UInt32>::create();
+ auto& date_v2_data = column_vector_date_v2->get_data();
+ for (int i = 0; i < row_num; ++i) {
+
vectorized::DateV2Value<doris::vectorized::DateV2ValueType> value;
+ value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
+
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+ }
+ vectorized::DataTypePtr date_v2_type(
+ std::make_shared<vectorized::DataTypeDateV2>());
+ vectorized::ColumnWithTypeAndName
test_date_v2(column_vector_date_v2->get_ptr(),
+ date_v2_type,
col_name);
+ block.insert(test_date_v2);
+ }
+ break;
+ case TYPE_DATE: // int64
+ tslot.__set_slotType(type_desc.to_thrift());
+ {
+ auto column_vector_date =
vectorized::ColumnVector<vectorized::Int64>::create();
+ auto& date_data = column_vector_date->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ vectorized::VecDateTimeValue value;
+ value.from_date_int64(20210501);
+
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+ }
+ vectorized::DataTypePtr
date_type(std::make_shared<vectorized::DataTypeDate>());
+ vectorized::ColumnWithTypeAndName
test_date(column_vector_date->get_ptr(),
+ date_type,
col_name);
+ block.insert(test_date);
+ }
+ break;
+ case TYPE_DATETIME: // int64
+ tslot.__set_slotType(type_desc.to_thrift());
+ {
+ auto column_vector_datetime =
vectorized::ColumnVector<vectorized::Int64>::create();
+ auto& datetime_data = column_vector_datetime->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ vectorized::VecDateTimeValue value;
+ value.from_date_int64(20210501080910);
+
datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+ }
+ vectorized::DataTypePtr datetime_type(
+ std::make_shared<vectorized::DataTypeDateTime>());
+ vectorized::ColumnWithTypeAndName
test_datetime(column_vector_datetime->get_ptr(),
+ datetime_type,
col_name);
+ block.insert(test_datetime);
+ }
+ break;
+ default:
+ break;
+ }
+
+ tslot.__set_col_unique_id(std::get<2>(t));
+ SlotDescriptor* slot = new SlotDescriptor(tslot);
+ tuple_desc.add_slot(slot);
+ }
+
+ RowDescriptor row_desc(&tuple_desc, true);
+ // arrow schema
+ std::shared_ptr<arrow::Schema> _arrow_schema;
+ EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
+
+ // serialize
+ std::shared_ptr<arrow::RecordBatch> result;
+ std::cout << "block structure: " << block.dump_structure() << std::endl;
+ std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) <<
std::endl;
+
+ convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(),
&result);
+ Block new_block = block.clone_empty();
+ // deserialize
+ for (auto t : cols) {
+ std::string real_column_name = std::get<0>(t);
+ auto* array = result->GetColumnByName(real_column_name).get();
+ auto& column_with_type_and_name =
new_block.get_by_name(real_column_name);
+ if (std::get<3>(t) == PrimitiveType::TYPE_DATE ||
+ std::get<3>(t) == PrimitiveType::TYPE_DATETIME) {
+ {
+ auto strcol = vectorized::ColumnString::create();
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
+ vectorized::ColumnWithTypeAndName
type_and_name(strcol->get_ptr(), data_type,
+
real_column_name);
+ arrow_column_to_doris_column(array, 0, type_and_name.column,
type_and_name.type,
+ block.rows(), "UTC");
+ {
+ auto& col =
column_with_type_and_name.column.get()->assume_mutable_ref();
+ auto& date_data =
static_cast<ColumnVector<Int64>&>(col).get_data();
+ for (int i = 0; i < strcol->size(); ++i) {
+ StringRef str = strcol->get_data_at(i);
+ vectorized::VecDateTimeValue value;
+ value.from_date_str(str.data, str.size);
+
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
+ }
+ }
+ }
+ continue;
+ } else if (std::get<3>(t) == PrimitiveType::TYPE_DATEV2) {
+ auto strcol = vectorized::ColumnString::create();
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
+ vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(),
data_type,
+ real_column_name);
+ arrow_column_to_doris_column(array, 0, type_and_name.column,
type_and_name.type,
+ block.rows(), "UTC");
+ {
+ auto& col =
column_with_type_and_name.column.get()->assume_mutable_ref();
+ auto& date_data =
static_cast<ColumnVector<UInt32>&>(col).get_data();
+ for (int i = 0; i < strcol->size(); ++i) {
+ StringRef str = strcol->get_data_at(i);
+ DateV2Value<DateV2ValueType> value;
+ value.from_date_str(str.data, str.size);
+
date_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+ }
+ }
+ continue;
+ }
+ arrow_column_to_doris_column(array, 0,
column_with_type_and_name.column,
+ column_with_type_and_name.type,
block.rows(), "UTC");
+ }
+
+ std::cout << block.dump_data() << std::endl;
+ std::cout << new_block.dump_data() << std::endl;
+ EXPECT_EQ(block.dump_data(), new_block.dump_data());
+}
+
+TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) {
+ serialize_and_deserialize_arrow_test();
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
new file mode 100644
index 0000000000..7bed95be8d
--- /dev/null
+++ b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
@@ -0,0 +1,200 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/types.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <math.h>
+#include <stdlib.h>
+#include <time.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest_pred_impl.h"
+#include "olap/hll.h"
+#include "util/bitmap_value.h"
+#include "util/quantile_state.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_hll.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_quantilestate.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/serde/data_type_serde.h"
+
+namespace doris::vectorized {
+
+void column_to_pb(const DataTypePtr data_type, const IColumn& col, PValues*
result) {
+ const DataTypeSerDeSPtr serde = data_type->get_serde();
+ serde->write_column_to_pb(col, *result, 0, col.size());
+}
+
+void pb_to_column(const DataTypePtr data_type, PValues& result, IColumn& col) {
+ auto serde = data_type->get_serde();
+ serde->read_column_from_pb(col, result);
+}
+
+void check_pb_col(const DataTypePtr data_type, const IColumn& col) {
+ PValues pv = PValues();
+ column_to_pb(data_type, col, &pv);
+ std::string s1 = pv.DebugString();
+
+ auto col1 = data_type->create_column();
+ pb_to_column(data_type, pv, *col1);
+ PValues as_pv = PValues();
+ column_to_pb(data_type, *col1, &as_pv);
+
+ std::string s2 = as_pv.DebugString();
+ EXPECT_EQ(s1, s2);
+}
+
+void serialize_and_deserialize_pb_test() {
+ // int
+ {
+ auto vec = vectorized::ColumnVector<Int32>::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < 1024; ++i) {
+ data.push_back(i);
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeInt32>());
+ check_pb_col(data_type, *vec.get());
+ }
+ // string
+ {
+ auto strcol = vectorized::ColumnString::create();
+ for (int i = 0; i < 1024; ++i) {
+ std::string is = std::to_string(i);
+ strcol->insert_data(is.c_str(), is.size());
+ }
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
+ check_pb_col(data_type, *strcol.get());
+ }
+ // decimal
+ {
+ vectorized::DataTypePtr
decimal_data_type(doris::vectorized::create_decimal(27, 9, true));
+ auto decimal_column = decimal_data_type->create_column();
+ auto& data =
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)
+ decimal_column.get())
+ ->get_data();
+ for (int i = 0; i < 1024; ++i) {
+ __int128_t value = i * pow(10, 9) + i * pow(10, 8);
+ data.push_back(value);
+ }
+ check_pb_col(decimal_data_type, *decimal_column.get());
+ }
+ // bitmap
+ {
+ vectorized::DataTypePtr
bitmap_data_type(std::make_shared<vectorized::DataTypeBitMap>());
+ auto bitmap_column = bitmap_data_type->create_column();
+ std::vector<BitmapValue>& container =
+ ((vectorized::ColumnBitmap*)bitmap_column.get())->get_data();
+ for (int i = 0; i < 4; ++i) {
+ BitmapValue bv;
+ for (int j = 0; j <= i; ++j) {
+ bv.add(j);
+ }
+ container.push_back(bv);
+ }
+ check_pb_col(bitmap_data_type, *bitmap_column.get());
+ }
+ // hll
+ {
+ vectorized::DataTypePtr
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
+ auto hll_column = hll_data_type->create_column();
+ std::vector<HyperLogLog>& container =
+ ((vectorized::ColumnHLL*)hll_column.get())->get_data();
+ for (int i = 0; i < 4; ++i) {
+ HyperLogLog hll;
+ hll.update(i);
+ container.push_back(hll);
+ }
+ check_pb_col(hll_data_type, *hll_column.get());
+ }
+ // quantilestate
+ {
+ vectorized::DataTypePtr quantile_data_type(
+ std::make_shared<vectorized::DataTypeQuantileStateDouble>());
+ auto quantile_column = quantile_data_type->create_column();
+ std::vector<QuantileStateDouble>& container =
+
((vectorized::ColumnQuantileStateDouble*)quantile_column.get())->get_data();
+ const long max_rand = 1000000L;
+ double lower_bound = 0;
+ double upper_bound = 100;
+ srandom(time(nullptr));
+ for (int i = 0; i < 1024; ++i) {
+ QuantileStateDouble q;
+ double random_double =
+ lower_bound + (upper_bound - lower_bound) * (random() %
max_rand) / max_rand;
+ q.add_value(random_double);
+ container.push_back(q);
+ }
+ check_pb_col(quantile_data_type, *quantile_column.get());
+ }
+ // nullable string
+ {
+ vectorized::DataTypePtr
string_data_type(std::make_shared<vectorized::DataTypeString>());
+ vectorized::DataTypePtr nullable_data_type(
+
std::make_shared<vectorized::DataTypeNullable>(string_data_type));
+ auto nullable_column = nullable_data_type->create_column();
+
((vectorized::ColumnNullable*)nullable_column.get())->insert_null_elements(1024);
+ check_pb_col(nullable_data_type, *nullable_column.get());
+ }
+ // nullable decimal
+ {
+ vectorized::DataTypePtr
decimal_data_type(doris::vectorized::create_decimal(27, 9, true));
+ vectorized::DataTypePtr nullable_data_type(
+
std::make_shared<vectorized::DataTypeNullable>(decimal_data_type));
+ auto nullable_column = nullable_data_type->create_column();
+
((vectorized::ColumnNullable*)nullable_column.get())->insert_null_elements(1024);
+ check_pb_col(nullable_data_type, *nullable_column.get());
+ }
+ // int with 1024 batch size
+ {
+ auto vec = vectorized::ColumnVector<Int32>::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < 1024; ++i) {
+ data.push_back(i);
+ }
+ std::cout << vec->size() << std::endl;
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeInt32>());
+ vectorized::DataTypePtr nullable_data_type(
+ std::make_shared<vectorized::DataTypeNullable>(data_type));
+ auto nullable_column = nullable_data_type->create_column();
+ ((vectorized::ColumnNullable*)nullable_column.get())
+ ->insert_range_from_not_nullable(*vec, 0, 1024);
+ check_pb_col(nullable_data_type, *nullable_column.get());
+ }
+}
+
+TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest) {
+ serialize_and_deserialize_pb_test();
+}
+
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]