This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 8f79742f7d9 branch-2.1: [fix](arrow) Fix Arrow serialization and
deserialization of Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types
(#49244)
8f79742f7d9 is described below
commit 8f79742f7d98a3252abe8a9ebdc056fd2399a56e
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Mar 20 09:57:04 2025 +0800
branch-2.1: [fix](arrow) Fix Arrow serialization and deserialization of
Date/Datetime/Array/Map/Struct/Bitmap/HLL/Decimal256 types (#49244)
### What problem does this PR solve?
pick #48944 [fix](arrow) Fix UT DataTypeSerDeArrowTest of
Array/Map/Struct/Bitmap/HLL/Decimal256 types
pick #48398 [fix](arrow) Fix UT DataTypeSerDeArrowTest of Date type
---
be/src/runtime/types.cpp | 27 +-
be/src/runtime/types.h | 3 +-
be/src/util/arrow/block_convertor.cpp | 272 +-----------
be/src/util/arrow/row_batch.cpp | 2 -
be/src/vec/columns/column_array.cpp | 1 +
be/src/vec/columns/column_map.cpp | 1 +
be/src/vec/columns/column_string.h | 4 +
be/src/vec/columns/column_struct.cpp | 1 +
be/src/vec/columns/column_vector.h | 5 +-
be/src/vec/data_types/data_type_time_v2.h | 4 +-
.../data_types/serde/data_type_date64_serde.cpp | 33 +-
.../vec/data_types/serde/data_type_date64_serde.h | 7 +
.../serde/data_type_datetimev2_serde.cpp | 5 +-
.../data_types/serde/data_type_datev2_serde.cpp | 11 +-
.../data_types/serde/data_type_decimal_serde.cpp | 45 +-
.../vec/data_types/serde/data_type_ipv6_serde.cpp | 18 +-
.../data_types/serde/data_type_number_serde.cpp | 20 +-
.../vec/data_types/common_data_type_serder_test.h | 445 +++++++++++++++++++
be/test/vec/data_types/data_type_map_test.cpp | 178 ++++++++
be/test/vec/data_types/data_type_struct_test.cpp | 115 +++++
.../serde/data_type_serde_arrow_test.cpp | 481 +++++++--------------
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 4 +-
be/test/vec/exprs/vexpr_test.cpp | 4 +-
23 files changed, 1037 insertions(+), 649 deletions(-)
diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 14ba4b2cebd..f0112ebd2bc 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -46,12 +46,20 @@ TypeDescriptor::TypeDescriptor(const
std::vector<TTypeNode>& types, int* idx)
DCHECK(scalar_type.__isset.len);
len = scalar_type.len;
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type ==
TYPE_DECIMAL64 ||
- type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type
== TYPE_DATETIMEV2 ||
- type == TYPE_TIMEV2) {
+ type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
DCHECK(scalar_type.__isset.precision);
DCHECK(scalar_type.__isset.scale);
precision = scalar_type.precision;
scale = scalar_type.scale;
+ } else if (type == TYPE_DATETIMEV2) {
+ DCHECK(scalar_type.__isset.scale);
+ scale = scalar_type.scale;
+ } else if (type == TYPE_TIMEV2) {
+ if (scalar_type.__isset.scale) {
+ scale = scalar_type.scale;
+ } else {
+ scale = 0;
+ }
} else if (type == TYPE_STRING) {
if (scalar_type.__isset.len) {
len = scalar_type.len;
@@ -152,11 +160,14 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type)
const {
// DCHECK_NE(len, -1);
scalar_type.__set_len(len);
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type ==
TYPE_DECIMAL64 ||
- type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type
== TYPE_DATETIMEV2) {
+ type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
DCHECK_NE(precision, -1);
DCHECK_NE(scale, -1);
scalar_type.__set_precision(precision);
scalar_type.__set_scale(scale);
+ } else if (type == TYPE_DATETIMEV2) {
+ DCHECK_NE(scale, -1);
+ scalar_type.__set_scale(scale);
}
}
}
@@ -169,11 +180,14 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
if (type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_HLL || type
== TYPE_STRING) {
scalar_type->set_len(len);
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type ==
TYPE_DECIMAL64 ||
- type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type ==
TYPE_DATETIMEV2) {
+ type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
DCHECK_NE(precision, -1);
DCHECK_NE(scale, -1);
scalar_type->set_precision(precision);
scalar_type->set_scale(scale);
+ } else if (type == TYPE_DATETIMEV2) {
+ DCHECK_NE(scale, -1);
+ scalar_type->set_scale(scale);
} else if (type == TYPE_ARRAY) {
node->set_type(TTypeNodeType::ARRAY);
node->set_contains_null(contains_nulls[0]);
@@ -219,11 +233,14 @@ TypeDescriptor::TypeDescriptor(const
google::protobuf::RepeatedPtrField<PTypeNod
DCHECK(scalar_type.has_len());
len = scalar_type.len();
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type ==
TYPE_DECIMAL64 ||
- type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256 || type
== TYPE_DATETIMEV2) {
+ type == TYPE_DECIMAL128I || type == TYPE_DECIMAL256) {
DCHECK(scalar_type.has_precision());
DCHECK(scalar_type.has_scale());
precision = scalar_type.precision();
scale = scalar_type.scale();
+ } else if (type == TYPE_DATETIMEV2) {
+ DCHECK(scalar_type.has_scale());
+ scale = scalar_type.scale();
} else if (type == TYPE_STRING) {
if (scalar_type.has_len()) {
len = scalar_type.len();
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 4cb7d51e4b5..c96480274fc 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -49,6 +49,7 @@ struct TypeDescriptor {
/// Only set if type == TYPE_DECIMAL
int precision;
+ /// Only set if type == TYPE_DECIMAL or type = TYPE_DATETIMEV2
int scale;
std::vector<TypeDescriptor> children;
@@ -68,11 +69,11 @@ struct TypeDescriptor {
// explicit TypeDescriptor(PrimitiveType type) :
TypeDescriptor(PrimitiveType type) : type(type), len(-1), precision(-1),
scale(-1) {
+ // TODO, should not initialize default values, force initialization by
parameters or external.
if (type == TYPE_DECIMALV2) {
precision = 27;
scale = 9;
} else if (type == TYPE_DATETIMEV2) {
- precision = 18;
scale = 6;
}
}
diff --git a/be/src/util/arrow/block_convertor.cpp
b/be/src/util/arrow/block_convertor.cpp
index 817231e02ba..4db60144ea5 100644
--- a/be/src/util/arrow/block_convertor.cpp
+++ b/be/src/util/arrow/block_convertor.cpp
@@ -66,10 +66,7 @@ class Array;
namespace doris {
-// Convert Block to an Arrow::Array
-// We should keep this function to keep compatible with arrow's type visitor
-// Now we inherit TypeVisitor to use default Visit implementation
-class FromBlockConverter : public arrow::TypeVisitor {
+class FromBlockConverter {
public:
FromBlockConverter(const vectorized::Block& block, const
std::shared_ptr<arrow::Schema>& schema,
arrow::MemoryPool* pool, const cctz::time_zone&
timezone_obj)
@@ -79,276 +76,11 @@ public:
_cur_field_idx(-1),
_timezone_obj(timezone_obj) {}
- ~FromBlockConverter() override = default;
-
- // Use base class function
- using arrow::TypeVisitor::Visit;
-
-#define PRIMITIVE_VISIT(TYPE) \
- arrow::Status Visit(const arrow::TYPE& type) override { return
_visit(type); }
-
- PRIMITIVE_VISIT(Int8Type)
- PRIMITIVE_VISIT(Int16Type)
- PRIMITIVE_VISIT(Int32Type)
- PRIMITIVE_VISIT(Int64Type)
- PRIMITIVE_VISIT(FloatType)
- PRIMITIVE_VISIT(DoubleType)
-
-#undef PRIMITIVE_VISIT
-
- // process string-transformable field
- arrow::Status Visit(const arrow::StringType& type) override {
- auto& builder = assert_cast<arrow::StringBuilder&>(*_cur_builder);
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = _cur_col->get_data_at(i);
- vectorized::TypeIndex type_idx =
vectorized::remove_nullable(_cur_type)->get_type_id();
- switch (type_idx) {
- case vectorized::TypeIndex::String:
- case vectorized::TypeIndex::FixedString:
- case vectorized::TypeIndex::HLL: {
- if (data_ref.size == 0) {
- // 0x01 is a magic num, not useful actually, just for
present ""
- //char* tmp_val = reinterpret_cast<char*>(0x01);
- ARROW_RETURN_NOT_OK(builder.Append(""));
- } else {
- ARROW_RETURN_NOT_OK(builder.Append(data_ref.data,
data_ref.size));
- }
- break;
- }
- case vectorized::TypeIndex::Date:
- case vectorized::TypeIndex::DateTime: {
- char buf[64];
- const VecDateTimeValue* time_val = (const
VecDateTimeValue*)(data_ref.data);
- int len = time_val->to_buffer(buf);
- ARROW_RETURN_NOT_OK(builder.Append(buf, len));
- break;
- }
- case vectorized::TypeIndex::DateV2: {
- char buf[64];
- const DateV2Value<DateV2ValueType>* time_val =
- (const DateV2Value<DateV2ValueType>*)(data_ref.data);
- int len = time_val->to_buffer(buf);
- ARROW_RETURN_NOT_OK(builder.Append(buf, len));
- break;
- }
- case vectorized::TypeIndex::DateTimeV2: {
- char buf[64];
- const DateV2Value<DateTimeV2ValueType>* time_val =
- (const
DateV2Value<DateTimeV2ValueType>*)(data_ref.data);
- int len = time_val->to_buffer(buf);
- ARROW_RETURN_NOT_OK(builder.Append(buf, len));
- break;
- }
- case vectorized::TypeIndex::Int128: {
- auto string_temp = LargeIntValue::to_string(
- reinterpret_cast<const
PackedInt128*>(data_ref.data)->value);
- ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(),
string_temp.size()));
- break;
- }
- case vectorized::TypeIndex::JSONB: {
- std::string string_temp =
- JsonbToJson::jsonb_to_json_string(data_ref.data,
data_ref.size);
- ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(),
string_temp.size()));
- break;
- }
- default: {
- LOG(WARNING) << "can't convert this type = " <<
vectorized::getTypeName(type_idx)
- << " to arrow type";
- return arrow::Status::TypeError("unsupported column type");
- }
- }
- }
- return arrow::Status::OK();
- }
-
- // process doris Decimal
- arrow::Status Visit(const arrow::Decimal128Type& type) override {
- auto& builder = assert_cast<arrow::Decimal128Builder&>(*_cur_builder);
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
- if (auto* decimalv2_column = vectorized::check_and_get_column<
- vectorized::ColumnDecimal<vectorized::Decimal128V2>>(
- *vectorized::remove_nullable(_cur_col))) {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(27, 9);
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = decimalv2_column->get_data_at(i);
- const PackedInt128* p_value = reinterpret_cast<const
PackedInt128*>(data_ref.data);
- int64_t high = (p_value->value) >> 64;
- uint64 low = p_value->value;
- arrow::Decimal128 value(high, low);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return arrow::Status::OK();
- } else if (auto* decimal128_column = vectorized::check_and_get_column<
-
vectorized::ColumnDecimal<vectorized::Decimal128V3>>(
- *vectorized::remove_nullable(_cur_col))) {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(38,
decimal128_column->get_scale());
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = decimal128_column->get_data_at(i);
- const PackedInt128* p_value = reinterpret_cast<const
PackedInt128*>(data_ref.data);
- int64_t high = (p_value->value) >> 64;
- uint64 low = p_value->value;
- arrow::Decimal128 value(high, low);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return arrow::Status::OK();
- } else if (auto* decimal32_column = vectorized::check_and_get_column<
- vectorized::ColumnDecimal<vectorized::Decimal32>>(
- *vectorized::remove_nullable(_cur_col))) {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(8,
decimal32_column->get_scale());
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = decimal32_column->get_data_at(i);
- const int32_t* p_value = reinterpret_cast<const
int32_t*>(data_ref.data);
- int64_t high = *p_value > 0 ? 0 : 1UL << 63;
- arrow::Decimal128 value(high, *p_value > 0 ? *p_value :
-*p_value);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return arrow::Status::OK();
- } else if (auto* decimal64_column = vectorized::check_and_get_column<
- vectorized::ColumnDecimal<vectorized::Decimal64>>(
- *vectorized::remove_nullable(_cur_col))) {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(18,
decimal64_column->get_scale());
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = decimal64_column->get_data_at(i);
- const int64_t* p_value = reinterpret_cast<const
int64_t*>(data_ref.data);
- int64_t high = *p_value > 0 ? 0 : 1UL << 63;
- arrow::Decimal128 value(high, *p_value > 0 ? *p_value :
-*p_value);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return arrow::Status::OK();
- } else {
- return arrow::Status::TypeError("Unsupported column:" +
_cur_col->get_name());
- }
- }
- // process boolean
- arrow::Status Visit(const arrow::BooleanType& type) override {
- auto& builder = assert_cast<arrow::BooleanBuilder&>(*_cur_builder);
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = _cur_col->get_data_at(i);
- ARROW_RETURN_NOT_OK(builder.Append(*(const bool*)data_ref.data));
- }
- return arrow::Status::OK();
- }
-
- // process array type
- arrow::Status Visit(const arrow::ListType& type) override {
- auto& builder = assert_cast<arrow::ListBuilder&>(*_cur_builder);
- auto orignal_col = _cur_col;
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
-
- const vectorized::ColumnArray* array_column = nullptr;
- if (orignal_col->is_nullable()) {
- auto nullable_column =
- assert_cast<const
vectorized::ColumnNullable*>(orignal_col.get());
- array_column = assert_cast<const vectorized::ColumnArray*>(
- &nullable_column->get_nested_column());
- } else {
- array_column = assert_cast<const
vectorized::ColumnArray*>(orignal_col.get());
- }
- const auto& offsets = array_column->get_offsets();
- vectorized::ColumnPtr nested_column = array_column->get_data_ptr();
-
- // set current col/type/builder to nested
- _cur_col = nested_column;
- if (_cur_type->is_nullable()) {
- auto nullable_type = assert_cast<const
vectorized::DataTypeNullable*>(_cur_type.get());
- _cur_type = assert_cast<const vectorized::DataTypeArray*>(
- nullable_type->get_nested_type().get())
- ->get_nested_type();
- } else {
- _cur_type = assert_cast<const
vectorized::DataTypeArray*>(_cur_type.get())
- ->get_nested_type();
- }
- _cur_builder = builder.value_builder();
-
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = orignal_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- // append array elements in row i
- ARROW_RETURN_NOT_OK(builder.Append());
- _cur_start = offsets[i - 1];
- _cur_rows = offsets[i] - offsets[i - 1];
- ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*type.value_type(),
this));
- }
-
- return arrow::Status::OK();
- }
+ ~FromBlockConverter() = default;
Status convert(std::shared_ptr<arrow::RecordBatch>* out);
private:
- template <typename T>
- arrow::Status _visit(const T& type) {
- auto& builder = assert_cast<arrow::NumericBuilder<T>&>(*_cur_builder);
- size_t start = _cur_start;
- size_t num_rows = _cur_rows;
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- if (_cur_col->is_nullable()) {
- for (size_t i = start; i < start + num_rows; ++i) {
- bool is_null = _cur_col->is_null_at(i);
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- const auto& data_ref = _cur_col->get_data_at(i);
- ARROW_RETURN_NOT_OK(builder.Append(*(const typename
T::c_type*)data_ref.data));
- }
- } else {
- ARROW_RETURN_NOT_OK(builder.AppendValues(
- (const typename
T::c_type*)_cur_col->get_data_at(start).data, num_rows));
- }
- return arrow::Status::OK();
- }
-
const vectorized::Block& _block;
const std::shared_ptr<arrow::Schema>& _schema;
arrow::MemoryPool* _pool;
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index ecdd733e76b..38dd40ca4c4 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -105,8 +105,6 @@ Status convert_to_arrow_type(const TypeDescriptor& type,
std::shared_ptr<arrow::
}
break;
case TYPE_DECIMALV2:
- *result = std::make_shared<arrow::Decimal128Type>(27, 9);
- break;
case TYPE_DECIMAL32:
case TYPE_DECIMAL64:
case TYPE_DECIMAL128I:
diff --git a/be/src/vec/columns/column_array.cpp
b/be/src/vec/columns/column_array.cpp
index 9c40676af47..949e992af1d 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -410,6 +410,7 @@ void ColumnArray::update_crcs_with_value(uint32_t*
__restrict hash, PrimitiveTyp
}
void ColumnArray::insert(const Field& x) {
+ DCHECK_EQ(x.get_type(), Field::Types::Array);
if (x.is_null()) {
get_data().insert(Null());
get_offsets().push_back(get_offsets().back() + 1);
diff --git a/be/src/vec/columns/column_map.cpp
b/be/src/vec/columns/column_map.cpp
index 694d2e39f5c..46df3c4b59e 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -142,6 +142,7 @@ void ColumnMap::insert_data(const char*, size_t) {
}
void ColumnMap::insert(const Field& x) {
+ DCHECK_EQ(x.get_type(), Field::Types::Map);
const auto& map = doris::vectorized::get<const Map&>(x);
CHECK_EQ(map.size(), 2);
const auto& k_f = doris::vectorized::get<const Array&>(map[0]);
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 81a495eabd1..b9d684eb53a 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -151,6 +151,10 @@ public:
const auto& real_field = vectorized::get<const JsonbField&>(x);
s = StringRef(real_field.get_value(), real_field.get_size());
} else {
+ DCHECK_EQ(x.get_type(), Field::Types::String);
+ // If `x.get_type()` is not String, such as UInt64, may get the
error
+ // `string column length is too large:
total_length=13744632839234567870`
+ // because `<String>(x).size() = 13744632839234567870`
s.data = vectorized::get<const String&>(x).data();
s.size = vectorized::get<const String&>(x).size();
}
diff --git a/be/src/vec/columns/column_struct.cpp
b/be/src/vec/columns/column_struct.cpp
index be0f4e7adde..8cd71d822b2 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -134,6 +134,7 @@ bool ColumnStruct::is_default_at(size_t n) const {
}
void ColumnStruct::insert(const Field& x) {
+ DCHECK_EQ(x.get_type(), Field::Types::Tuple);
const auto& tuple = x.get<const Tuple&>();
const size_t tuple_size = columns.size();
if (tuple.size() != tuple_size) {
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index 444603c1d87..a7c1ddc7ccb 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -356,8 +356,9 @@ public:
// For example, during create column_const(1, uint8), will use
NearestFieldType
// to cast a uint8 to int64, so that the Field is int64, but the column is
created
- // using data_type, so that T == uint8. After the field is created, it
will be inserted
- // into the column, but its type is different from column's data type, so
that during column
+ // using data_type, so that T == uint8, NearestFieldType<T> == uint64.
+ // After the field is created, it will be inserted into the column,
+ // but its type is different from column's data type (int64 vs uint64), so
that during column
// insert method, should use NearestFieldType<T> to get the Field and get
it actual
// uint8 value and then insert into column.
void insert(const Field& x) override {
diff --git a/be/src/vec/data_types/data_type_time_v2.h
b/be/src/vec/data_types/data_type_time_v2.h
index 86ce7836c56..28360f7e4de 100644
--- a/be/src/vec/data_types/data_type_time_v2.h
+++ b/be/src/vec/data_types/data_type_time_v2.h
@@ -114,7 +114,9 @@ public:
DataTypeDateTimeV2(const DataTypeDateTimeV2& rhs) : _scale(rhs._scale) {}
TypeIndex get_type_id() const override { return TypeIndex::DateTimeV2; }
TypeDescriptor get_type_as_type_descriptor() const override {
- return TypeDescriptor(TYPE_DATETIMEV2);
+ auto desc = TypeDescriptor(TYPE_DATETIMEV2);
+ desc.scale = _scale;
+ return desc;
}
doris::FieldType get_storage_field_type() const override {
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
index 7077229d861..d5b2e1e70d7 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp
@@ -157,6 +157,12 @@ Status
DataTypeDateTimeSerDe::deserialize_one_cell_from_json(IColumn& column, Sl
return Status::OK();
}
+void DataTypeDateTimeSerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ _read_column_from_arrow<false>(column, arrow_array, start, end, ctz);
+}
+
void DataTypeDate64SerDe::write_column_to_arrow(const IColumn& column, const
NullMap* null_map,
arrow::ArrayBuilder*
array_builder, int start,
int end, const
cctz::time_zone& ctz) const {
@@ -196,9 +202,10 @@ static int64_t time_unit_divisor(arrow::TimeUnit::type
unit) {
}
}
-void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
- int start, int end,
- const cctz::time_zone& ctz)
const {
+template <bool is_date>
+void DataTypeDate64SerDe::_read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
int64_t divisor = 1;
int64_t multiplier = 1;
@@ -237,13 +244,15 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn&
column, const arrow::A
}
} else if (arrow_array->type()->id() == arrow::Type::STRING) {
// to be compatible with old version, we use string type for date.
- auto concrete_array = dynamic_cast<const
arrow::StringArray*>(arrow_array);
- for (size_t value_i = start; value_i < end; ++value_i) {
- Int64 val = 0;
+ const auto* concrete_array = dynamic_cast<const
arrow::StringArray*>(arrow_array);
+ for (auto value_i = start; value_i < end; ++value_i) {
auto val_str = concrete_array->GetString(value_i);
- ReadBuffer rb(val_str.data(), val_str.size());
- read_datetime_text_impl(val, rb, ctz);
- col_data.emplace_back(val);
+ VecDateTimeValue v;
+ v.from_date_str(val_str.c_str(), val_str.length(), ctz);
+ if constexpr (is_date) {
+ v.cast_to_date();
+ }
+ col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
}
} else {
throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
@@ -251,6 +260,12 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn&
column, const arrow::A
}
}
+void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const
arrow::Array* arrow_array,
+ int start, int end,
+ const cctz::time_zone& ctz)
const {
+ _read_column_from_arrow<true>(column, arrow_array, start, end, ctz);
+}
+
template <bool is_binary_format>
Status DataTypeDate64SerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h
b/be/src/vec/data_types/serde/data_type_date64_serde.h
index 497ac2aeff4..5f5fc4f1c38 100644
--- a/be/src/vec/data_types/serde/data_type_date64_serde.h
+++ b/be/src/vec/data_types/serde/data_type_date64_serde.h
@@ -73,6 +73,11 @@ public:
int start, int end,
std::vector<StringRef>& buffer_list) const
override;
+protected:
+ template <bool is_date>
+ void _read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const;
+
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
@@ -94,6 +99,8 @@ public:
Status deserialize_column_from_json_vector(IColumn& column,
std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options)
const override;
+ void read_column_from_arrow(IColumn& column, const arrow::Array*
arrow_array, int start,
+ int end, const cctz::time_zone& ctz) const
override;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index 12d30961e3e..e8dd2274765 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -160,7 +160,10 @@ void
DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
// convert second
v.from_unixtime(utc_epoch / divisor, ctz);
// get rest time
- v.set_microsecond(utc_epoch % divisor);
+ // add 0 on the right to make it 6 digits. DateTimeV2Value
microsecond is 6 digits,
+ // the scale decides to keep the first few digits, so the valid
digits should be kept at the front.
+ // "2022-01-01 11:11:11.111", utc_epoch = 1641035471111, divisor =
1000, set_microsecond(111000)
+ v.set_microsecond((utc_epoch % divisor) * DIVISOR_FOR_MICRO /
divisor);
col_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(v));
}
} else {
diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
index 95109ee408c..f07c449851e 100644
--- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp
@@ -102,15 +102,10 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn&
column, const arrow::A
int start, int end,
const cctz::time_zone& ctz)
const {
auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
- auto concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array);
- int64_t divisor = 1;
- int64_t multiplier = 1;
-
- multiplier = 24 * 60 * 60; // day => secs
- for (size_t value_i = start; value_i < end; ++value_i) {
+ const auto* concrete_array = dynamic_cast<const
arrow::Date32Array*>(arrow_array);
+ for (auto value_i = start; value_i < end; ++value_i) {
DateV2Value<DateV2ValueType> v;
- v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) /
divisor * multiplier,
- ctz);
+ v.get_date_from_daynr(concrete_array->Value(value_i) + date_threshold);
col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>,
UInt32>(v));
}
}
diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
index d98f6cae2b0..92b69dbfca9 100644
--- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp
@@ -88,8 +88,8 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
arrow::ArrayBuilder*
array_builder, int start,
int end, const
cctz::time_zone& ctz) const {
auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column);
- auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(27, 9);
for (size_t i = start; i < end; ++i) {
@@ -108,6 +108,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
}
// TODO: decimal256
} else if constexpr (std::is_same_v<T, Decimal128V3>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
for (size_t i = start; i < end; ++i) {
@@ -125,6 +126,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
array_builder->type()->name());
}
} else if constexpr (std::is_same_v<T, Decimal<Int32>>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(8, col.get_scale());
for (size_t i = start; i < end; ++i) {
@@ -139,6 +141,7 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
array_builder->type()->name());
}
} else if constexpr (std::is_same_v<T, Decimal<Int64>>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal128Builder&>(*array_builder);
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(18, col.get_scale());
for (size_t i = start; i < end; ++i) {
@@ -152,6 +155,28 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const
IColumn& column, const
checkArrowStatus(builder.Append(value), column.get_name(),
array_builder->type()->name());
}
+ } else if constexpr (std::is_same_v<T, Decimal256>) {
+ auto& builder =
reinterpret_cast<arrow::Decimal256Builder&>(*array_builder);
+ std::shared_ptr<arrow::DataType> s_decimal_ptr =
+ std::make_shared<arrow::Decimal256Type>(76, col.get_scale());
+ for (size_t i = start; i < end; ++i) {
+ if (null_map && (*null_map)[i]) {
+ checkArrowStatus(builder.AppendNull(), column.get_name(),
+ array_builder->type()->name());
+ continue;
+ }
+ auto p_value = wide::Int256(col.get_element(i));
+ using half_type = wide::Int256::base_type; // uint64_t
+ half_type a0 = p_value.items[wide::Int256::_impl::little(0)];
+ half_type a1 = p_value.items[wide::Int256::_impl::little(1)];
+ half_type a2 = p_value.items[wide::Int256::_impl::little(2)];
+ half_type a3 = p_value.items[wide::Int256::_impl::little(3)];
+
+ std::array<uint64_t, 4> word_array = {a0, a1, a2, a3};
+ arrow::Decimal256 value(arrow::Decimal256::LittleEndianArray,
word_array);
+ checkArrowStatus(builder.Append(value), column.get_name(),
+ array_builder->type()->name());
+ }
} else {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_arrow with type " +
column.get_name());
@@ -162,14 +187,14 @@ template <typename T>
void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
const arrow::Array*
arrow_array, int start,
int end, const
cctz::time_zone& ctz) const {
- auto concrete_array = dynamic_cast<const
arrow::DecimalArray*>(arrow_array);
- const auto* arrow_decimal_type =
- static_cast<const arrow::DecimalType*>(arrow_array->type().get());
- const auto arrow_scale = arrow_decimal_type->scale();
auto& column_data = static_cast<ColumnDecimal<T>&>(column).get_data();
// Decimal<Int128> for decimalv2
// Decimal<Int128I> for deicmalv3
if constexpr (std::is_same_v<T, Decimal<Int128>>) {
+ const auto* concrete_array = dynamic_cast<const
arrow::DecimalArray*>(arrow_array);
+ const auto* arrow_decimal_type =
+ static_cast<const
arrow::DecimalType*>(arrow_array->type().get());
+ const auto arrow_scale = arrow_decimal_type->scale();
// TODO check precision
for (size_t value_i = start; value_i < end; ++value_i) {
auto value = *reinterpret_cast<const vectorized::Decimal128V2*>(
@@ -195,7 +220,13 @@ void
DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
}
} else if constexpr (std::is_same_v<T, Decimal128V3> || std::is_same_v<T,
Decimal64> ||
std::is_same_v<T, Decimal32>) {
- for (size_t value_i = start; value_i < end; ++value_i) {
+ const auto* concrete_array = dynamic_cast<const
arrow::DecimalArray*>(arrow_array);
+ for (auto value_i = start; value_i < end; ++value_i) {
+ column_data.emplace_back(*reinterpret_cast<const
T*>(concrete_array->Value(value_i)));
+ }
+ } else if constexpr (std::is_same_v<T, Decimal256>) {
+ const auto* concrete_array = dynamic_cast<const
arrow::Decimal256Array*>(arrow_array);
+ for (auto value_i = start; value_i < end; ++value_i) {
column_data.emplace_back(*reinterpret_cast<const
T*>(concrete_array->Value(value_i)));
}
} else {
@@ -257,7 +288,7 @@ Status DataTypeDecimalSerDe<T>::write_column_to_orc(const
std::string& timezone,
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
auto& v = col_data[row_id];
- orc::Int128 value(v >> 64, (uint64_t)v);
+ orc::Int128 value(v >> 64, (uint64_t)v); // TODO, Decimal256
will lose precision
cur_batch->values[row_id] = value;
}
}
diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
index 612c9ce4222..f59fc712d98 100644
--- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
@@ -165,13 +165,19 @@ void DataTypeIPv6SerDe::read_column_from_arrow(IColumn&
column, const arrow::Arr
buffer->data() + concrete_array->value_offset(offset_i));
const auto raw_data_len = concrete_array->value_length(offset_i);
- IPv6 ipv6_val;
- if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len)) {
- throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
- "parse number fail, string: '{}'",
- std::string(raw_data,
raw_data_len).c_str());
+ if (raw_data_len == 0) {
+ col_data.emplace_back(0);
+ } else {
+ IPv6 ipv6_val;
+ if (!IPv6Value::from_string(ipv6_val, raw_data, raw_data_len))
{
+ throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+ "parse number fail, string: '{}'",
+ std::string(raw_data,
raw_data_len).c_str());
+ }
+ col_data.emplace_back(ipv6_val);
}
- col_data.emplace_back(ipv6_val);
+ } else {
+ col_data.emplace_back(0);
}
}
}
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index f4fb6bbbb1f..522cf02c75f 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -215,14 +215,20 @@ void
DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
const auto* raw_data = buffer->data() +
concrete_array->value_offset(offset_i);
const auto raw_data_len =
concrete_array->value_length(offset_i);
- Int128 val = 0;
- ReadBuffer rb(raw_data, raw_data_len);
- if (!read_int_text_impl(val, rb)) {
- throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
- "parse number fail, string: '{}'",
- std::string(rb.position(),
rb.count()).c_str());
+ if (raw_data_len == 0) {
+ col_data.emplace_back(Int128()); // Int128() is NULL
+ } else {
+ Int128 val = 0;
+ ReadBuffer rb(raw_data, raw_data_len);
+ if (!read_int_text_impl(val, rb)) {
+ throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+ "parse number fail, string:
'{}'",
+ std::string(rb.position(),
rb.count()).c_str());
+ }
+ col_data.emplace_back(val);
}
- col_data.emplace_back(val);
+ } else {
+ col_data.emplace_back(Int128()); // Int128() is NULL
}
}
return;
diff --git a/be/test/vec/data_types/common_data_type_serder_test.h
b/be/test/vec/data_types/common_data_type_serder_test.h
new file mode 100644
index 00000000000..c5db1d16f18
--- /dev/null
+++ b/be/test/vec/data_types/common_data_type_serder_test.h
@@ -0,0 +1,445 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <arrow/record_batch.h>
+#include <gen_cpp/data.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <fstream>
+#include <iostream>
+#include <memory>
+
+#include "arrow/array/array_base.h"
+#include "arrow/type.h"
+#include "runtime/descriptors.h"
+#include "util/arrow/block_convertor.h"
+#include "util/arrow/row_batch.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+#include "vec/runtime/ipv6_value.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+// this test is gonna to be a data type serialize and deserialize functions
+// such as
+// 1. standard hive text ser-deserialize
+// deserialize_one_cell_from_hive_text (IColumn &column, Slice &slice, const
FormatOptions &options, int hive_text_complex_type_delimiter_level=1) const
+// deserialize_column_from_hive_text_vector (IColumn &column, std::vector<
Slice > &slices, int *num_deserialized, const FormatOptions &options, int
hive_text_complex_type_delimiter_level=1) const
+// serialize_one_cell_to_hive_text (const IColumn &column, int row_num,
BufferWritable &bw, FormatOptions &options, int
hive_text_complex_type_delimiter_level=1) const
+// 2. json format ser-deserialize which used in table not in doris database
+// serialize_one_cell_to_json (const IColumn &column, int row_num,
BufferWritable &bw, FormatOptions &options) const =0
+// serialize_column_to_json (const IColumn &column, int start_idx, int
end_idx, BufferWritable &bw, FormatOptions &options) const =0
+// deserialize_one_cell_from_json (IColumn &column, Slice &slice, const
FormatOptions &options) const =0
+// deserialize_column_from_json_vector (IColumn &column, std::vector< Slice >
&slices, uint64_t *num_deserialized, const FormatOptions &options) const =0
+// deserialize_column_from_fixed_json (IColumn &column, Slice &slice,
uint64_t rows, uint64_t *num_deserialized, const FormatOptions &options) const
+// insert_column_last_value_multiple_times (IColumn &column, uint64_t times)
const
+// 3. fe|be protobuffer ser-deserialize
+// write_column_to_pb (const IColumn &column, PValues &result, int start, int
end) const =0
+// read_column_from_pb (IColumn &column, const PValues &arg) const =0
+// 4. jsonb ser-deserialize which used in row-store situation
+// write_one_cell_to_jsonb (const IColumn &column, JsonbWriter &result, Arena
*mem_pool, int32_t col_id, int row_num) const =0
+// read_one_cell_from_jsonb (IColumn &column, const JsonbValue *arg) const =0
+// 5. mysql text ser-deserialize
+// write_column_to_mysql (const IColumn &column, MysqlRowBuffer< false >
&row_buffer, int row_idx, bool col_const, const FormatOptions &options) const =0
+// write_column_to_mysql (const IColumn &column, MysqlRowBuffer< true >
&row_buffer, int row_idx, bool col_const, const FormatOptions &options) const =0
+// 6. arrow ser-deserialize which used in spark-flink connector
+// write_column_to_arrow (const IColumn &column, const NullMap *null_map,
arrow::ArrayBuilder *array_builder, int start, int end, const cctz::time_zone
&ctz) const =0
+// read_column_from_arrow (IColumn &column, const arrow::Array *arrow_array,
int start, int end, const cctz::time_zone &ctz) const =0
+// 7. rapidjson ser-deserialize
+// write_one_cell_to_json (const IColumn &column, rapidjson::Value &result,
rapidjson::Document::AllocatorType &allocator, Arena &mem_pool, int row_num)
const
+// read_one_cell_from_json (IColumn &column, const rapidjson::Value &result)
const
+// convert_field_to_rapidjson (const vectorized::Field &field,
rapidjson::Value &target, rapidjson::Document::AllocatorType &allocator)
+// convert_array_to_rapidjson (const vectorized::Array &array,
rapidjson::Value &target, rapidjson::Document::AllocatorType &allocator)
+
+namespace doris::vectorized {
+
+class CommonDataTypeSerdeTest : public ::testing::Test {
+public:
+
////==================================================================================================================
+ // this is common function to check data in column against expected
results according different function in assert function
+ // which can be used in all column test
+ // such as run regress tests
+ // step1. we can set gen_check_data_in_assert to true, then we will
generate a file for check data, otherwise we will read the file to check data
+ // step2. we should write assert callback function to check data
+ static void check_data(
+ MutableColumns& columns, DataTypeSerDeSPtrs serders, char
col_spliter,
+ std::set<int> idxes, const std::string& column_data_file,
+ std::function<void(MutableColumns& load_cols, DataTypeSerDeSPtrs
serders)>
+ assert_callback,
+ bool is_hive_format = false, DataTypes dataTypes = {}) {
+ ASSERT_EQ(serders.size(), columns.size());
+ // Step 1: Insert data from `column_data_file` into the column and
check result with `check_data_file`
+ // Load column data and expected data from CSV files
+ std::vector<std::vector<std::string>> res;
+ struct stat buff;
+ if (stat(column_data_file.c_str(), &buff) == 0) {
+ if (S_ISREG(buff.st_mode)) {
+ // file
+ if (is_hive_format) {
+ load_data_and_assert_from_csv<true, true>(serders,
columns, column_data_file,
+ col_spliter,
idxes);
+ } else {
+ load_data_and_assert_from_csv<false, true>(serders,
columns, column_data_file,
+ col_spliter,
idxes);
+ }
+ } else if (S_ISDIR(buff.st_mode)) {
+ // dir
+ std::filesystem::path fs_path(column_data_file);
+ for (const auto& entry :
std::filesystem::directory_iterator(fs_path)) {
+ std::string file_path = entry.path().string();
+ std::cout << "load data from file: " << file_path <<
std::endl;
+ if (is_hive_format) {
+ load_data_and_assert_from_csv<true, true>(serders,
columns, file_path,
+ col_spliter,
idxes);
+ } else {
+ load_data_and_assert_from_csv<false, true>(serders,
columns, file_path,
+
col_spliter, idxes);
+ }
+ }
+ }
+ }
+
+ // Step 2: Validate the data in `column` matches `expected_data`
+ assert_callback(columns, serders);
+ }
+
+ // Helper function to load data from CSV, with index which splited by
spliter and load to columns
+ template <bool is_hive_format, bool generate_res_file>
+ static void load_data_and_assert_from_csv(const DataTypeSerDeSPtrs serders,
+ MutableColumns& columns, const
std::string& file_path,
+ const char spliter = ';',
+ const std::set<int> idxes = {0})
{
+ ASSERT_EQ(serders.size(), columns.size())
+ << "serder size: " << serders.size() << " column size: " <<
columns.size();
+ ASSERT_EQ(serders.size(), idxes.size())
+ << "serder size: " << serders.size() << " idxes size: " <<
idxes.size();
+ std::ifstream file(file_path);
+ if (!file) {
+ throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "can not open
the file: {} ",
+ file_path);
+ }
+
+ std::string line;
+ DataTypeSerDe::FormatOptions options;
+ std::vector<std::vector<std::string>> res;
+ MutableColumns assert_str_cols(columns.size());
+ for (size_t i = 0; i < columns.size(); ++i) {
+ assert_str_cols[i] = ColumnString::create();
+ }
+
+ while (std::getline(file, line)) {
+ std::stringstream lineStream(line);
+ std::string value;
+ int l_idx = 0;
+ int c_idx = 0;
+ std::vector<std::string> row;
+ while (std::getline(lineStream, value, spliter)) {
+ if (!value.starts_with("//") && idxes.contains(l_idx)) {
+ // load csv data
+ Slice string_slice(value.data(), value.size());
+ Status st;
+ // deserialize data
+ if constexpr (is_hive_format) {
+ st =
serders[c_idx]->deserialize_one_cell_from_hive_text(
+ *columns[c_idx], string_slice, options);
+ } else {
+ st =
serders[c_idx]->deserialize_one_cell_from_json(*columns[c_idx],
+
string_slice, options);
+ }
+ if (!st.ok()) {
+ // deserialize if happen error now we do not insert
any value for input column
+ // so we push a default value to column for row
alignment
+ columns[c_idx]->insert_default();
+ std::cout << "error in deserialize but continue: " <<
st.to_string()
+ << std::endl;
+ }
+ // serialize data
+ size_t row_num = columns[c_idx]->size() - 1;
+ assert_str_cols[c_idx]->reserve(columns[c_idx]->size());
+ VectorBufferWriter
bw(assert_cast<ColumnString&>(*assert_str_cols[c_idx]));
+ if constexpr (is_hive_format) {
+ st =
serders[c_idx]->serialize_one_cell_to_hive_text(*columns[c_idx],
+
row_num, bw, options);
+ EXPECT_TRUE(st.ok()) << st.to_string();
+ } else {
+ st =
serders[c_idx]->serialize_one_cell_to_json(*columns[c_idx], row_num,
+ bw,
options);
+ EXPECT_TRUE(st.ok()) << st.to_string();
+ }
+ bw.commit();
+ // assert data : origin data and serialized data should be
equal or generated
+ // file to check data
+ size_t assert_size = assert_str_cols[c_idx]->size();
+ if constexpr (!generate_res_file) {
+
EXPECT_EQ(assert_str_cols[c_idx]->get_data_at(assert_size - 1).to_string(),
+ string_slice.to_string())
+ << "column: " << columns[c_idx]->get_name() <<
" row: " << row_num
+ << " is_hive_format: " << is_hive_format;
+ }
+ ++c_idx;
+ }
+ res.push_back(row);
+ ++l_idx;
+ }
+ }
+
+ if (generate_res_file) {
+ // generate res
+ auto pos = file_path.find_last_of(".");
+ std::string hive_format = is_hive_format ? "_hive" : "";
+ std::string res_file = file_path.substr(0, pos) + hive_format +
"_serde_res.csv";
+ std::ofstream res_f(res_file);
+ if (!res_f.is_open()) {
+ throw std::ios_base::failure("Failed to open file." +
res_file);
+ }
+ for (size_t r = 0; r < assert_str_cols[0]->size(); ++r) {
+ for (size_t c = 0; c < assert_str_cols.size(); ++c) {
+ res_f << assert_str_cols[c]->get_data_at(r).to_string() <<
spliter;
+ }
+ res_f << std::endl;
+ }
+ res_f.close();
+ std::cout << "generate res file: " << res_file << std::endl;
+ }
+ }
+
+ // standard hive text ser-deserialize assert function
+ // pb serde now is only used RPCFncall and fold_constant_executor which
just write column data to pb value means
+ // just call write_column_to_pb
+ static void assert_pb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs
serders) {
+ for (size_t i = 0; i < load_cols.size(); ++i) {
+ auto& col = load_cols[i];
+ std::cout << " now we are testing column : " << col->get_name() <<
std::endl;
+ // serialize to pb
+ PValues pv = PValues();
+ Status st = serders[i]->write_column_to_pb(*col, pv, 0,
col->size());
+ if (!st.ok()) {
+ std::cerr << "write_column_to_pb error: " << st.msg() <<
std::endl;
+ continue;
+ }
+ // deserialize from pb
+ auto except_column = col->clone_empty();
+ st = serders[i]->read_column_from_pb(*except_column, pv);
+ EXPECT_TRUE(st.ok()) << st.to_string();
+ // check pb value from expected column
+ PValues as_pv = PValues();
+ st = serders[i]->write_column_to_pb(*except_column, as_pv, 0,
except_column->size());
+ EXPECT_TRUE(st.ok()) << st.to_string();
+ EXPECT_EQ(pv.bytes_value_size(), as_pv.bytes_value_size());
+ // check column value
+ for (size_t j = 0; j < col->size(); ++j) {
+ auto cell = col->operator[](j);
+ auto except_cell = except_column->operator[](j);
+ EXPECT_EQ(cell, except_cell) << "column: " << col->get_name()
<< " row: " << j;
+ }
+ }
+ }
+
+ // actually this is block_to_jsonb and jsonb_to_block test
+ // static void assert_jsonb_format(MutableColumns& load_cols,
DataTypeSerDeSPtrs serders) {
+ // Arena pool;
+ // auto jsonb_column = ColumnString::create(); // jsonb column
+ // // maybe these load_cols has different size, so we keep it same
+ // size_t max_row_size = load_cols[0]->size();
+ // for (size_t i = 1; i < load_cols.size(); ++i) {
+ // if (load_cols[i]->size() > max_row_size) {
+ // max_row_size = load_cols[i]->size();
+ // }
+ // }
+ // // keep same rows
+ // for (size_t i = 0; i < load_cols.size(); ++i) {
+ // if (load_cols[i]->size() < max_row_size) {
+ // load_cols[i]->insert_many_defaults(max_row_size -
load_cols[i]->size());
+ // } else if (load_cols[i]->size() > max_row_size) {
+ // load_cols[i]->resize(max_row_size);
+ // }
+ // }
+ // jsonb_column->reserve(load_cols[0]->size());
+ // MutableColumns assert_cols;
+ // for (size_t i = 0; i < load_cols.size(); ++i) {
+ // assert_cols.push_back(load_cols[i]->assume_mutable());
+ // }
+ // for (size_t r = 0; r < load_cols[0]->size(); ++r) {
+ // JsonbWriterT<JsonbOutStream> jw;
+ // jw.writeStartObject();
+ // // serialize to jsonb
+ // for (size_t i = 0; i < load_cols.size(); ++i) {
+ // auto& col = load_cols[i];
+ // serders[i]->write_one_cell_to_jsonb(*col, jw, &pool, i, r);
+ // }
+ // jw.writeEndObject();
+ // jsonb_column->insert_data(jw.getOutput()->getBuffer(),
jw.getOutput()->getSize());
+ // }
+ // // deserialize jsonb column to assert column
+ // EXPECT_EQ(jsonb_column->size(), load_cols[0]->size());
+ // for (size_t r = 0; r < jsonb_column->size(); ++r) {
+ // StringRef jsonb_data = jsonb_column->get_data_at(r);
+ // auto pdoc =
JsonbDocument::checkAndCreateDocument(jsonb_data.data, jsonb_data.size);
+ // JsonbDocument& doc = *pdoc;
+ // size_t cIdx = 0;
+ // for (auto it = doc->begin(); it != doc->end(); ++it) {
+ // serders[cIdx]->read_one_cell_from_jsonb(*assert_cols[cIdx],
it->value());
+ // ++cIdx;
+ // }
+ // }
+ // // check column value
+ // for (size_t i = 0; i < load_cols.size(); ++i) {
+ // auto& col = load_cols[i];
+ // auto& assert_col = assert_cols[i];
+ // for (size_t j = 0; j < col->size(); ++j) {
+ // auto cell = col->operator[](j);
+ // auto assert_cell = assert_col->operator[](j);
+ // EXPECT_EQ(cell, assert_cell) << "column: " <<
col->get_name() << " row: " << j;
+ // }
+ // }
+ // }
+
+ // assert mysql text format, now we just simple assert not to fatal or
exception here
+ static void assert_mysql_format(MutableColumns& load_cols,
DataTypeSerDeSPtrs serders) {
+ MysqlRowBuffer<false> row_buffer;
+ for (size_t i = 0; i < load_cols.size(); ++i) {
+ auto& col = load_cols[i];
+ for (size_t j = 0; j < col->size(); ++j) {
+ Status st;
+ EXPECT_NO_FATAL_FAILURE(
+ st = serders[i]->write_column_to_mysql(*col,
row_buffer, j, false, {}));
+ EXPECT_TRUE(st.ok()) << st.to_string();
+ }
+ }
+ }
+
+ // assert arrow serialize
+ static void assert_arrow_format(MutableColumns& load_cols, DataTypes
types) {
+ // make a block to write to arrow
+ auto block = std::make_shared<Block>();
+ build_block(block, load_cols, types);
+ auto record_batch = serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ deserialize_arrow(assert_block, record_batch);
+ compare_two_blocks(block, assert_block);
+ }
+
+ static void build_block(const std::shared_ptr<Block>& block,
MutableColumns& load_cols,
+ DataTypes types) {
+ // maybe these load_cols has different size, so we keep it same
+ size_t max_row_size = load_cols[0]->size();
+ for (size_t i = 1; i < load_cols.size(); ++i) {
+ if (load_cols[i]->size() > max_row_size) {
+ max_row_size = load_cols[i]->size();
+ }
+ }
+ // keep same rows
+ for (auto& load_col : load_cols) {
+ if (load_col->size() < max_row_size) {
+ load_col->insert_many_defaults(max_row_size -
load_col->size());
+ } else if (load_col->size() > max_row_size) {
+ load_col->resize(max_row_size);
+ }
+ }
+ for (size_t i = 0; i < load_cols.size(); ++i) {
+ auto& col = load_cols[i];
+ block->insert(ColumnWithTypeAndName(std::move(col), types[i],
types[i]->get_name()));
+ }
+ // print block
+ std::cout << "build block structure: " << block->dump_structure() <<
std::endl;
+ std::cout << "build block data: "
+ << block->dump_data(0, std::min(max_row_size,
static_cast<size_t>(5)))
+ << std::endl;
+ for (int i = 0; i < block->columns(); i++) {
+ auto col = block->get_by_position(i);
+ std::cout << "col: " << i << ", " << col.column->get_name() << ", "
+ << col.type->get_name() << ", " << col.to_string(0) <<
std::endl;
+ }
+ }
+
+ static std::shared_ptr<arrow::RecordBatch> serialize_arrow(
+ const std::shared_ptr<Block>& block) {
+ std::shared_ptr<arrow::Schema> block_arrow_schema;
+ EXPECT_EQ(get_arrow_schema_from_block(*block, &block_arrow_schema,
"UTC"), Status::OK());
+ std::cout << "schema: " << block_arrow_schema->ToString(true) <<
std::endl;
+ // convert block to arrow
+ std::shared_ptr<arrow::RecordBatch> result;
+ cctz::time_zone _timezone_obj; //default UTC
+ Status stt = convert_to_arrow_batch(*block, block_arrow_schema,
+ arrow::default_memory_pool(),
&result, _timezone_obj);
+ EXPECT_EQ(Status::OK(), stt) << "convert block to arrow failed" <<
stt.to_string();
+ std::cout << "arrow serialize result: " << result->num_columns() << ",
"
+ << result->num_rows() << std::endl;
+ return result;
+ }
+
+ static void deserialize_arrow(const std::shared_ptr<Block>& new_block,
+ std::shared_ptr<arrow::RecordBatch>
record_batch) {
+ // deserialize arrow to block
+ auto rows = record_batch->num_rows();
+ for (size_t i = 0; i < record_batch->num_columns(); ++i) {
+ auto array = record_batch->column(i);
+ std::cout << "arrow record_batch pos: " << i << ", array: " <<
array->ToString()
+ << std::endl;
+ auto& column_with_type_and_name = new_block->get_by_position(i);
+ std::cout << "now we are testing column: "
+ << column_with_type_and_name.column->get_name()
+ << ", type: " <<
column_with_type_and_name.type->get_name() << std::endl;
+ auto ret =
+ arrow_column_to_doris_column(array.get(), 0,
column_with_type_and_name.column,
+
column_with_type_and_name.type, rows, "UTC");
+ // do check data
+ std::cout << "arrow_column_to_doris_column done, column data: "
+ << column_with_type_and_name.to_string(0)
+ << ", column size: " <<
column_with_type_and_name.column->size() << std::endl;
+ EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" <<
ret.to_string();
+ }
+ std::cout << "arrow deserialize block structure: " <<
new_block->dump_structure()
+ << std::endl;
+ std::cout << "arrow deserialize block data: "
+ << new_block->dump_data(
+ 0, std::min(static_cast<size_t>(rows),
static_cast<size_t>(5)))
+ << std::endl;
+ }
+
+ static void compare_two_blocks(const std::shared_ptr<Block>& frist_block,
+ const std::shared_ptr<Block>& second_block)
{
+ for (size_t i = 0; i < frist_block->columns(); ++i) {
+ EXPECT_EQ(frist_block->get_by_position(i).type,
second_block->get_by_position(i).type);
+ auto& col = frist_block->get_by_position(i).column;
+ auto& assert_col = second_block->get_by_position(i).column;
+ std::cout << "compare_two_blocks, column: " << col->get_name()
+ << ", type: " <<
frist_block->get_by_position(i).type->get_name()
+ << ", size: " << col->size() << ", assert size: " <<
assert_col->size()
+ << std::endl;
+ EXPECT_EQ(assert_col->size(), col->size());
+ for (size_t j = 0; j < assert_col->size(); ++j) {
+ EXPECT_EQ(frist_block->get_by_position(i).to_string(j),
+ second_block->get_by_position(i).to_string(j));
+ auto cell = col->operator[](j);
+ auto assert_cell = assert_col->operator[](j);
+ EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name()
<< " row: " << j;
+ }
+ }
+ EXPECT_EQ(frist_block->dump_data(), second_block->dump_data());
+ }
+
+ // assert rapidjson format
+ // now rapidjson write_one_cell_to_json and read_one_cell_from_json only
used in column_object
+ // can just be replaced by jsonb format
+};
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/data_type_map_test.cpp
b/be/test/vec/data_types/data_type_map_test.cpp
new file mode 100644
index 00000000000..548802b0d91
--- /dev/null
+++ b/be/test/vec/data_types/data_type_map_test.cpp
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_map.h"
+
+#include <execinfo.h> // for backtrace on Linux
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+
+#include "common/exception.h"
+#include "vec/columns/column.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/function/function_test_util.h"
+
+namespace doris::vectorized {
+
+// TODO `DataTypeMapSerDe::deserialize_one_cell_from_json` has a bug,
+// `SerdeArrowTest` cannot test Map type nested Array and Struct and Map,
+// so manually construct data to test them.
+// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed.
+TEST(DataTypeMapTest, SerdeNestedTypeArrowTest) {
+ auto block = std::make_shared<Block>();
+ {
+ std::string col_name = "map_nesting_array";
+ DataTypePtr f1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ DataTypePtr dt1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1));
+ DataTypePtr dt2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f2));
+ DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+ Array a1, a2, a3, a4;
+ a1.push_back(Field("cute"));
+ a1.push_back(Null());
+ a2.push_back(Field("clever"));
+ a1.push_back(Field("hello"));
+ a3.push_back(1);
+ a3.push_back(2);
+ a4.push_back(11);
+ a4.push_back(22);
+
+ Array k1, v1;
+ k1.push_back(a1);
+ k1.push_back(a2);
+ v1.push_back(a3);
+ v1.push_back(a4);
+
+ Map m1;
+ m1.push_back(k1);
+ m1.push_back(v1);
+
+ MutableColumnPtr map_column = ma->create_column();
+ map_column->reserve(1);
+ map_column->insert(m1);
+ vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(),
ma, col_name);
+ block->insert(type_and_name);
+ }
+ {
+ std::string col_name = "map_nesting_struct";
+ DataTypePtr f1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+ DataTypePtr f3 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+ DataTypePtr f4 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr dt1 = std::make_shared<DataTypeNullable>(
+ std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f1,
f2, f3}));
+ DataTypePtr dt2 = std::make_shared<DataTypeNullable>(
+ std::make_shared<DataTypeStruct>(std::vector<DataTypePtr>
{f4}));
+ DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+ Tuple t1, t2, t3, t4;
+ t1.push_back(Field("clever"));
+ t1.push_back(__int128_t(37));
+ t1.push_back(true);
+ t2.push_back("null");
+ t2.push_back(__int128_t(26));
+ t2.push_back(false);
+ t3.push_back(Field("cute"));
+ t4.push_back("null");
+
+ Array k1, v1;
+ k1.push_back(t1);
+ k1.push_back(t2);
+ v1.push_back(t3);
+ v1.push_back(t4);
+
+ Map m1;
+ m1.push_back(k1);
+ m1.push_back(v1);
+
+ MutableColumnPtr map_column = ma->create_column();
+ map_column->reserve(1);
+ map_column->insert(m1);
+ vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(),
ma, col_name);
+ block->insert(type_and_name);
+ }
+ {
+ std::string col_name = "map_nesting_map";
+ DataTypePtr f1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ DataTypePtr f2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f3 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+ DataTypePtr f4 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+ DataTypePtr dt1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f1, f2));
+ DataTypePtr dt2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f3, f4));
+ DataTypePtr ma = std::make_shared<DataTypeMap>(dt1, dt2);
+
+ Array k1, k2, k3, k4, v1, v2, v3, v4;
+ k1.push_back(1);
+ k1.push_back(2);
+ k2.push_back(11);
+ k2.push_back(22);
+ v1.push_back(Field("map"));
+ v1.push_back(Null());
+ v2.push_back(Field("clever map"));
+ v2.push_back(Field("hello map"));
+ k3.push_back(__int128_t(37));
+ k3.push_back(__int128_t(26));
+ k4.push_back(__int128_t(1111));
+ k4.push_back(__int128_t(432535423));
+ v3.push_back(true);
+ v3.push_back(false);
+ v4.push_back(false);
+ v4.push_back(true);
+
+ Map m11, m12, m21, m22;
+ m11.push_back(k1);
+ m11.push_back(v1);
+ m12.push_back(k2);
+ m12.push_back(v2);
+ m21.push_back(k3);
+ m21.push_back(v3);
+ m22.push_back(k4);
+ m22.push_back(v4);
+
+ Array kk1, vv1;
+ kk1.push_back(m11);
+ kk1.push_back(m12);
+ vv1.push_back(m21);
+ vv1.push_back(m22);
+
+ Map m1;
+ m1.push_back(kk1);
+ m1.push_back(vv1);
+
+ MutableColumnPtr map_column = ma->create_column();
+ map_column->reserve(1);
+ map_column->insert(m1);
+ vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(),
ma, col_name);
+ block->insert(type_and_name);
+ }
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/data_type_struct_test.cpp
b/be/test/vec/data_types/data_type_struct_test.cpp
new file mode 100644
index 00000000000..1fc8aac0312
--- /dev/null
+++ b/be/test/vec/data_types/data_type_struct_test.cpp
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_struct.h"
+
+#include <execinfo.h> // for backtrace on Linux
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+
+#include "vec/columns/column.h"
+#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/function/function_test_util.h"
+
+namespace doris::vectorized {
+
+// TODO `DataTypeStructSerDe::deserialize_one_cell_from_json` has a bug,
+// `SerdeArrowTest` cannot test Struct type nested Array and Map and Struct,
+// so manually construct data to test them.
+// Expect to delete this TEST after `deserialize_one_cell_from_json` is fixed.
+TEST(DataTypeStructTest, SerdeNestedTypeArrowTest) {
+ auto block = std::make_shared<Block>();
+ {
+ std::string col_name = "struct_nesting_array_map_struct";
+ DataTypePtr f1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ DataTypePtr f3 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f4 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ DataTypePtr f5 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+ DataTypePtr f6 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>());
+ DataTypePtr dt1 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(f1));
+ DataTypePtr dt2 =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeMap>(f2, f3));
+ DataTypePtr dt3 = std::make_shared<DataTypeNullable>(
+ std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {f4,
f5, f6}));
+ DataTypePtr st =
std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {dt1, dt2, dt3});
+
+ // nested Array
+ Array a1, a2;
+ a1.push_back(Field("array"));
+ a1.push_back(Null());
+ a2.push_back(Field("lucky array"));
+ a2.push_back(Field("cute array"));
+
+ // nested Map
+ Array k1, k2, v1, v2;
+ k1.push_back(1);
+ k1.push_back(2);
+ k2.push_back(11);
+ k2.push_back(22);
+ v1.push_back(Field("map"));
+ v1.push_back(Null());
+ v2.push_back(Field("clever map"));
+ v2.push_back(Field("hello map"));
+
+ Map m1, m2;
+ m1.push_back(k1);
+ m1.push_back(v1);
+ m2.push_back(k2);
+ m2.push_back(v2);
+
+ // nested Struct
+ Tuple t1, t2;
+ t1.push_back(Field("clever"));
+ t1.push_back(__int128_t(37));
+ t1.push_back(true);
+ t2.push_back("null");
+ t2.push_back(__int128_t(26));
+ t2.push_back(false);
+
+ // Struct
+ Tuple tt1, tt2;
+ tt1.push_back(a1);
+ tt1.push_back(m1);
+ tt1.push_back(t1);
+ tt2.push_back(a2);
+ tt2.push_back(m2);
+ tt2.push_back(t2);
+
+ MutableColumnPtr struct_column = st->create_column();
+ struct_column->reserve(2);
+ struct_column->insert(tt1);
+ struct_column->insert(tt2);
+ vectorized::ColumnWithTypeAndName
type_and_name(struct_column->get_ptr(), st, col_name);
+ block->insert(type_and_name);
+ }
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
index eb960abdfc1..f4caa35069f 100644
--- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -1,4 +1,3 @@
-
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -31,11 +30,10 @@
#include <gen_cpp/types.pb.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
-#include <math.h>
-#include <stdint.h>
-#include <stdlib.h>
-#include <time.h>
+#include <gtest/gtest.h>
+#include <cmath>
+#include <cstdint>
#include <iostream>
#include <memory>
#include <string>
@@ -43,26 +41,21 @@
#include <utility>
#include <vector>
-#include "gtest/gtest_pred_impl.h"
#include "olap/hll.h"
#include "runtime/descriptors.cpp"
-#include "runtime/descriptors.h"
#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
-#include "util/bitmap_value.h"
-#include "util/quantile_state.h"
#include "util/string_parser.hpp"
#include "vec/columns/column.h"
-#include "vec/columns/column_array.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_decimal.h"
-#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/core/block.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
+#include "vec/data_types/common_data_type_serder_test.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_bitmap.h"
@@ -79,62 +72,29 @@
#include "vec/data_types/data_type_string.h"
#include "vec/data_types/data_type_struct.h"
#include "vec/data_types/data_type_time_v2.h"
-#include "vec/io/io_helper.h"
#include "vec/runtime/vdatetime_value.h"
#include "vec/utils/arrow_column_to_doris_column.h"
namespace doris::vectorized {
-template <bool is_scalar>
-void serialize_and_deserialize_arrow_test() {
- vectorized::Block block;
- std::vector<std::tuple<std::string, FieldType, int, PrimitiveType, bool>>
cols;
- if constexpr (is_scalar) {
- cols = {
- {"k1", FieldType::OLAP_FIELD_TYPE_INT, 1, TYPE_INT, false},
- {"k7", FieldType::OLAP_FIELD_TYPE_INT, 7, TYPE_INT, true},
- {"k2", FieldType::OLAP_FIELD_TYPE_STRING, 2, TYPE_STRING,
false},
- {"k3", FieldType::OLAP_FIELD_TYPE_DECIMAL128I, 3,
TYPE_DECIMAL128I, false},
- {"k11", FieldType::OLAP_FIELD_TYPE_DATETIME, 11,
TYPE_DATETIME, false},
- {"k4", FieldType::OLAP_FIELD_TYPE_BOOL, 4, TYPE_BOOLEAN,
false},
- {"k5", FieldType::OLAP_FIELD_TYPE_DECIMAL32, 5,
TYPE_DECIMAL32, false},
- {"k6", FieldType::OLAP_FIELD_TYPE_DECIMAL64, 6,
TYPE_DECIMAL64, false},
- {"k12", FieldType::OLAP_FIELD_TYPE_DATETIMEV2, 12,
TYPE_DATETIMEV2, false},
- {"k8", FieldType::OLAP_FIELD_TYPE_IPV4, 8, TYPE_IPV4, false},
- {"k9", FieldType::OLAP_FIELD_TYPE_IPV6, 9, TYPE_IPV6, false},
- };
- } else {
- cols = {{"a", FieldType::OLAP_FIELD_TYPE_ARRAY, 6, TYPE_ARRAY, true},
- {"m", FieldType::OLAP_FIELD_TYPE_MAP, 8, TYPE_MAP, true},
- {"s", FieldType::OLAP_FIELD_TYPE_STRUCT, 5, TYPE_STRUCT,
true}};
- }
-
- int row_num = 7;
- // make desc and generate block
- TupleDescriptor tuple_desc(PTupleDescriptor(), true);
- for (auto t : cols) {
- TSlotDescriptor tslot;
- std::string col_name = std::get<0>(t);
- tslot.__set_colName(col_name);
- TypeDescriptor type_desc(std::get<3>(t));
- bool is_nullable(std::get<4>(t));
- switch (std::get<3>(t)) {
- case TYPE_BOOLEAN:
- tslot.__set_slotType(type_desc.to_thrift());
- {
- auto vec = vectorized::ColumnVector<UInt8>::create();
- auto& data = vec->get_data();
- for (int i = 0; i < row_num; ++i) {
- data.push_back(i % 2);
- }
- vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeUInt8>());
- vectorized::ColumnWithTypeAndName
type_and_name(vec->get_ptr(), data_type,
- col_name);
- block.insert(std::move(type_and_name));
+void serialize_and_deserialize_arrow_test(std::vector<PrimitiveType> cols, int
row_num,
+ bool is_nullable) {
+ auto block = std::make_shared<Block>();
+ for (int i = 0; i < cols.size(); i++) {
+ std::string col_name = std::to_string(i);
+ TypeDescriptor type_desc(cols[i]);
+ switch (cols[i]) {
+ case TYPE_BOOLEAN: {
+ auto vec = vectorized::ColumnVector<UInt8>::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i % 2);
}
- break;
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeUInt8>());
+ vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(),
data_type, col_name);
+ block->insert(std::move(type_and_name));
+ } break;
case TYPE_INT:
- tslot.__set_slotType(type_desc.to_thrift());
if (is_nullable) {
{
auto column_vector_int32 =
vectorized::ColumnVector<Int32>::create();
@@ -152,7 +112,7 @@ void serialize_and_deserialize_arrow_test() {
std::make_shared<vectorized::DataTypeInt32>());
vectorized::ColumnWithTypeAndName type_and_name(
mutable_nullable_vector->get_ptr(), data_type,
col_name);
- block.insert(type_and_name);
+ block->insert(type_and_name);
}
} else {
auto vec = vectorized::ColumnVector<Int32>::create();
@@ -163,13 +123,12 @@ void serialize_and_deserialize_arrow_test() {
vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeInt32>());
vectorized::ColumnWithTypeAndName
type_and_name(vec->get_ptr(), data_type,
col_name);
- block.insert(std::move(type_and_name));
+ block->insert(std::move(type_and_name));
}
break;
case TYPE_DECIMAL32:
type_desc.precision = 9;
type_desc.scale = 2;
- tslot.__set_slotType(type_desc.to_thrift());
{
vectorized::DataTypePtr decimal_data_type =
std::make_shared<DataTypeDecimal<Decimal32>>(type_desc.precision,
@@ -197,13 +156,12 @@ void serialize_and_deserialize_arrow_test() {
vectorized::ColumnWithTypeAndName
type_and_name(decimal_column->get_ptr(),
decimal_data_type, col_name);
- block.insert(type_and_name);
+ block->insert(type_and_name);
}
break;
case TYPE_DECIMAL64:
type_desc.precision = 18;
type_desc.scale = 6;
- tslot.__set_slotType(type_desc.to_thrift());
{
vectorized::DataTypePtr decimal_data_type =
std::make_shared<DataTypeDecimal<Decimal64>>(type_desc.precision,
@@ -229,13 +187,12 @@ void serialize_and_deserialize_arrow_test() {
}
vectorized::ColumnWithTypeAndName
type_and_name(decimal_column->get_ptr(),
decimal_data_type, col_name);
- block.insert(type_and_name);
+ block->insert(type_and_name);
}
break;
case TYPE_DECIMAL128I:
type_desc.precision = 27;
type_desc.scale = 9;
- tslot.__set_slotType(type_desc.to_thrift());
{
vectorized::DataTypePtr decimal_data_type(
doris::vectorized::create_decimal(27, 9, true));
@@ -244,146 +201,125 @@ void serialize_and_deserialize_arrow_test() {
decimal_column.get())
->get_data();
for (int i = 0; i < row_num; ++i) {
- __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10,
8));
+ auto value = __int128_t(i * pow(10, 9) + i * pow(10, 8));
data.push_back(value);
}
vectorized::ColumnWithTypeAndName
type_and_name(decimal_column->get_ptr(),
decimal_data_type, col_name);
- block.insert(type_and_name);
+ block->insert(type_and_name);
}
break;
- case TYPE_STRING:
- tslot.__set_slotType(type_desc.to_thrift());
- {
- auto strcol = vectorized::ColumnString::create();
- for (int i = 0; i < row_num; ++i) {
- std::string is = std::to_string(i);
- strcol->insert_data(is.c_str(), is.size());
- }
- vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
- vectorized::ColumnWithTypeAndName
type_and_name(strcol->get_ptr(), data_type,
- col_name);
- block.insert(type_and_name);
+ case TYPE_STRING: {
+ auto strcol = vectorized::ColumnString::create();
+ for (int i = 0; i < row_num; ++i) {
+ std::string is = std::to_string(i);
+ strcol->insert_data(is.c_str(), is.size());
}
- break;
- case TYPE_HLL:
- tslot.__set_slotType(type_desc.to_thrift());
- {
- vectorized::DataTypePtr
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
- auto hll_column = hll_data_type->create_column();
- std::vector<HyperLogLog>& container =
- ((vectorized::ColumnHLL*)hll_column.get())->get_data();
- for (int i = 0; i < row_num; ++i) {
- HyperLogLog hll;
- hll.update(i);
- container.push_back(hll);
- }
- vectorized::ColumnWithTypeAndName
type_and_name(hll_column->get_ptr(),
- hll_data_type,
col_name);
-
- block.insert(type_and_name);
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
+ vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(),
data_type, col_name);
+ block->insert(type_and_name);
+ } break;
+ case TYPE_HLL: {
+ vectorized::DataTypePtr
hll_data_type(std::make_shared<vectorized::DataTypeHLL>());
+ auto hll_column = hll_data_type->create_column();
+ std::vector<HyperLogLog>& container =
+ ((vectorized::ColumnHLL*)hll_column.get())->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ HyperLogLog hll;
+ hll.update(i);
+ container.push_back(hll);
}
- break;
- case TYPE_DATEV2:
- tslot.__set_slotType(type_desc.to_thrift());
- {
- auto column_vector_date_v2 =
vectorized::ColumnVector<vectorized::UInt32>::create();
- auto& date_v2_data = column_vector_date_v2->get_data();
- for (int i = 0; i < row_num; ++i) {
- DateV2Value<DateV2ValueType> value;
- value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
-
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
- }
- vectorized::DataTypePtr date_v2_type(
- std::make_shared<vectorized::DataTypeDateV2>());
- vectorized::ColumnWithTypeAndName
test_date_v2(column_vector_date_v2->get_ptr(),
- date_v2_type,
col_name);
- block.insert(test_date_v2);
+ vectorized::ColumnWithTypeAndName
type_and_name(hll_column->get_ptr(), hll_data_type,
+ col_name);
+
+ block->insert(type_and_name);
+ } break;
+ case TYPE_DATEV2: {
+ auto column_vector_date_v2 =
vectorized::ColumnVector<vectorized::UInt32>::create();
+ auto& date_v2_data = column_vector_date_v2->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ DateV2Value<DateV2ValueType> value;
+ value.from_date_int64(20210501);
+
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
}
- break;
+ vectorized::DataTypePtr
date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
+ vectorized::ColumnWithTypeAndName
test_date_v2(column_vector_date_v2->get_ptr(),
+ date_v2_type,
col_name);
+ block->insert(test_date_v2);
+ } break;
case TYPE_DATE: // int64
- tslot.__set_slotType(type_desc.to_thrift());
- {
- auto column_vector_date =
vectorized::ColumnVector<vectorized::Int64>::create();
- auto& date_data = column_vector_date->get_data();
- for (int i = 0; i < row_num; ++i) {
- VecDateTimeValue value;
- value.from_date_int64(20210501);
-
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
- }
- vectorized::DataTypePtr
date_type(std::make_shared<vectorized::DataTypeDate>());
- vectorized::ColumnWithTypeAndName
test_date(column_vector_date->get_ptr(),
- date_type,
col_name);
- block.insert(test_date);
+ {
+ auto column_vector_date =
vectorized::ColumnVector<vectorized::Int64>::create();
+ auto& date_data = column_vector_date->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ VecDateTimeValue value;
+ value.from_date_int64(20210501);
+
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
}
- break;
+ vectorized::DataTypePtr
date_type(std::make_shared<vectorized::DataTypeDate>());
+ vectorized::ColumnWithTypeAndName
test_date(column_vector_date->get_ptr(), date_type,
+ col_name);
+ block->insert(test_date);
+ } break;
case TYPE_DATETIME: // int64
- tslot.__set_slotType(type_desc.to_thrift());
- {
- auto column_vector_datetime =
vectorized::ColumnVector<vectorized::Int64>::create();
- auto& datetime_data = column_vector_datetime->get_data();
- for (int i = 0; i < row_num; ++i) {
- VecDateTimeValue value;
- value.from_date_int64(20210501080910);
-
datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
- }
- vectorized::DataTypePtr datetime_type(
- std::make_shared<vectorized::DataTypeDateTime>());
- vectorized::ColumnWithTypeAndName
test_datetime(column_vector_datetime->get_ptr(),
- datetime_type,
col_name);
- block.insert(test_datetime);
+ {
+ auto column_vector_datetime =
vectorized::ColumnVector<vectorized::Int64>::create();
+ auto& datetime_data = column_vector_datetime->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ VecDateTimeValue value;
+ value.from_date_int64(20210501080910);
+
datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
}
- break;
+ vectorized::DataTypePtr
datetime_type(std::make_shared<vectorized::DataTypeDateTime>());
+ vectorized::ColumnWithTypeAndName
test_datetime(column_vector_datetime->get_ptr(),
+ datetime_type,
col_name);
+ block->insert(test_datetime);
+ } break;
case TYPE_DATETIMEV2: // uint64
- tslot.__set_slotType(type_desc.to_thrift());
- {
- // 2022-01-01 11:11:11.111
- auto column_vector_datetimev2 =
- vectorized::ColumnVector<vectorized::UInt64>::create();
- // auto& datetimev2_data =
column_vector_datetimev2->get_data();
- DateV2Value<DateTimeV2ValueType> value;
- string date_literal = "2022-01-01 11:11:11.111";
- value.from_date_str(date_literal.c_str(), date_literal.size());
- char to[64] = {};
- std::cout << "value: " << value.to_string(to) << std::endl;
- for (int i = 0; i < row_num; ++i) {
- column_vector_datetimev2->insert(value.to_date_int_val());
- }
- vectorized::DataTypePtr datetimev2_type(
- std::make_shared<vectorized::DataTypeDateTimeV2>());
- vectorized::ColumnWithTypeAndName test_datetimev2(
- column_vector_datetimev2->get_ptr(), datetimev2_type,
col_name);
- block.insert(test_datetimev2);
+ {
+ auto column_vector_datetimev2 =
vectorized::ColumnVector<vectorized::UInt64>::create();
+ DateV2Value<DateTimeV2ValueType> value;
+ string date_literal = "2022-01-01 11:11:11.111";
+ cctz::time_zone ctz;
+ TimezoneUtils::find_cctz_time_zone("UTC", ctz);
+ EXPECT_TRUE(value.from_date_str(date_literal.c_str(),
date_literal.size(), ctz, 3));
+ char to[64] = {};
+ std::cout << "value: " << value.to_string(to) << std::endl;
+ for (int i = 0; i < row_num; ++i) {
+ column_vector_datetimev2->insert(value.to_date_int_val());
}
- break;
+ vectorized::DataTypePtr datetimev2_type(
+ std::make_shared<vectorized::DataTypeDateTimeV2>(3));
+ vectorized::ColumnWithTypeAndName
test_datetimev2(column_vector_datetimev2->get_ptr(),
+ datetimev2_type,
col_name);
+ block->insert(test_datetimev2);
+ } break;
case TYPE_ARRAY: // array
type_desc.add_sub_type(TYPE_STRING, true);
- tslot.__set_slotType(type_desc.to_thrift());
{
DataTypePtr s =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
DataTypePtr au = std::make_shared<DataTypeArray>(s);
Array a1, a2;
- a1.push_back(Field(String("sss")));
+ a1.push_back(Field("sss"));
a1.push_back(Null());
- a1.push_back(Field(String("clever amory")));
- a2.push_back(Field(String("hello amory")));
+ a1.push_back(Field("clever amory"));
+ a2.push_back(Field("hello amory"));
a2.push_back(Null());
- a2.push_back(Field(String("cute amory")));
- a2.push_back(Field(String("sf")));
+ a2.push_back(Field("cute amory"));
+ a2.push_back(Field("sf"));
MutableColumnPtr array_column = au->create_column();
array_column->reserve(2);
array_column->insert(a1);
array_column->insert(a2);
vectorized::ColumnWithTypeAndName
type_and_name(array_column->get_ptr(), au,
col_name);
- block.insert(type_and_name);
+ block->insert(type_and_name);
}
break;
case TYPE_MAP:
type_desc.add_sub_type(TYPE_STRING, true);
type_desc.add_sub_type(TYPE_STRING, true);
- tslot.__set_slotType(type_desc.to_thrift());
{
DataTypePtr s =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
@@ -416,14 +352,13 @@ void serialize_and_deserialize_arrow_test() {
map_column->insert(m1);
map_column->insert(m2);
vectorized::ColumnWithTypeAndName
type_and_name(map_column->get_ptr(), m, col_name);
- block.insert(type_and_name);
+ block->insert(type_and_name);
}
break;
case TYPE_STRUCT:
type_desc.add_sub_type(TYPE_STRING, "name", true);
type_desc.add_sub_type(TYPE_LARGEINT, "age", true);
type_desc.add_sub_type(TYPE_BOOLEAN, "is", true);
- tslot.__set_slotType(type_desc.to_thrift());
{
DataTypePtr s =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
@@ -434,10 +369,10 @@ void serialize_and_deserialize_arrow_test() {
DataTypePtr st =
std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {s, d, m});
Tuple t1, t2;
- t1.push_back(Field(String("amory cute")));
+ t1.push_back(Field("amory cute"));
t1.push_back(__int128_t(37));
t1.push_back(true);
- t2.push_back(Field("null"));
+ t2.push_back("null");
t2.push_back(__int128_t(26));
t2.push_back(false);
MutableColumnPtr struct_column = st->create_column();
@@ -446,144 +381,61 @@ void serialize_and_deserialize_arrow_test() {
struct_column->insert(t2);
vectorized::ColumnWithTypeAndName
type_and_name(struct_column->get_ptr(), st,
col_name);
- block.insert(type_and_name);
+ block->insert(type_and_name);
}
break;
- case TYPE_IPV4:
- tslot.__set_slotType(type_desc.to_thrift());
- {
- auto vec = vectorized::ColumnIPv4::create();
- auto& data = vec->get_data();
- for (int i = 0; i < row_num; ++i) {
- data.push_back(i);
- }
- vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeIPv4>());
- vectorized::ColumnWithTypeAndName
type_and_name(vec->get_ptr(), data_type,
- col_name);
- block.insert(std::move(type_and_name));
+ case TYPE_IPV4: {
+ auto vec = vectorized::ColumnIPv4::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i);
}
- break;
- case TYPE_IPV6:
- tslot.__set_slotType(type_desc.to_thrift());
- {
- auto vec = vectorized::ColumnIPv6::create();
- auto& data = vec->get_data();
- for (int i = 0; i < row_num; ++i) {
- data.push_back(i);
- }
- vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeIPv6>());
- vectorized::ColumnWithTypeAndName
type_and_name(vec->get_ptr(), data_type,
- col_name);
- block.insert(std::move(type_and_name));
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeIPv4>());
+ vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(),
data_type, col_name);
+ block->insert(std::move(type_and_name));
+ } break;
+ case TYPE_IPV6: {
+ auto vec = vectorized::ColumnIPv6::create();
+ auto& data = vec->get_data();
+ for (int i = 0; i < row_num; ++i) {
+ data.push_back(i);
}
- break;
+ vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeIPv6>());
+ vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(),
data_type, col_name);
+ block->insert(std::move(type_and_name));
+ } break;
default:
- break;
+ LOG(FATAL) << "error column type";
}
-
- tslot.__set_col_unique_id(std::get<2>(t));
- SlotDescriptor* slot = new SlotDescriptor(tslot);
- tuple_desc.add_slot(slot);
}
-
- RowDescriptor row_desc(&tuple_desc, true);
- // arrow schema
- std::shared_ptr<arrow::Schema> _arrow_schema;
- EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"),
Status::OK());
-
- // serialize
- std::shared_ptr<arrow::RecordBatch> result;
- std::cout << "block data: " << block.dump_data(0, row_num) << std::endl;
- std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) <<
std::endl;
-
- cctz::time_zone timezone_obj;
- TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
timezone_obj);
- static_cast<void>(convert_to_arrow_batch(block, _arrow_schema,
arrow::default_memory_pool(),
- &result, timezone_obj));
- Block new_block = block.clone_empty();
- EXPECT_TRUE(result != nullptr);
- std::cout << "result: " << result->ToString() << std::endl;
- // deserialize
- for (auto t : cols) {
- std::string real_column_name = std::get<0>(t);
- auto* array = result->GetColumnByName(real_column_name).get();
- auto& column_with_type_and_name =
new_block.get_by_name(real_column_name);
- if (std::get<3>(t) == PrimitiveType::TYPE_DATE ||
- std::get<3>(t) == PrimitiveType::TYPE_DATETIME) {
- {
- auto strcol = vectorized::ColumnString::create();
- vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
- vectorized::ColumnWithTypeAndName
type_and_name(strcol->get_ptr(), data_type,
-
real_column_name);
- static_cast<void>(arrow_column_to_doris_column(
- array, 0, type_and_name.column, type_and_name.type,
block.rows(), "UTC"));
- {
- auto& col =
column_with_type_and_name.column.get()->assume_mutable_ref();
- auto& date_data =
static_cast<ColumnVector<Int64>&>(col).get_data();
- for (int i = 0; i < strcol->size(); ++i) {
- StringRef str = strcol->get_data_at(i);
- VecDateTimeValue value;
- value.from_date_str(str.data, str.size);
-
date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value));
- }
- }
- }
- continue;
- } else if (std::get<3>(t) == PrimitiveType::TYPE_DATEV2) {
- auto strcol = vectorized::ColumnString::create();
- vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeString>());
- vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(),
data_type,
- real_column_name);
- static_cast<void>(arrow_column_to_doris_column(
- array, 0, type_and_name.column, type_and_name.type,
block.rows(), "UTC"));
- {
- auto& col =
column_with_type_and_name.column.get()->assume_mutable_ref();
- auto& date_data =
static_cast<ColumnVector<UInt32>&>(col).get_data();
- for (int i = 0; i < strcol->size(); ++i) {
- StringRef str = strcol->get_data_at(i);
- DateV2Value<DateV2ValueType> value;
- value.from_date_str(str.data, str.size);
-
date_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
- }
- }
- continue;
- } else if (std::get<3>(t) == PrimitiveType::TYPE_DATETIMEV2) {
- // now we only support read doris datetimev2 to arrow
- block.erase(real_column_name);
- new_block.erase(real_column_name);
- continue;
- }
- static_cast<void>(arrow_column_to_doris_column(array, 0,
column_with_type_and_name.column,
-
column_with_type_and_name.type, block.rows(),
- "UTC"));
- }
-
- std::cout << block.dump_data() << std::endl;
- std::cout << new_block.dump_data() << std::endl;
- EXPECT_EQ(block.dump_data(), new_block.dump_data());
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
}
TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) {
- serialize_and_deserialize_arrow_test<true>();
+ std::vector<PrimitiveType> cols = {
+ TYPE_INT, TYPE_INT, TYPE_STRING, TYPE_DECIMAL128I,
TYPE_BOOLEAN,
+ TYPE_DECIMAL32, TYPE_DECIMAL64, TYPE_IPV4, TYPE_IPV6,
TYPE_DATETIME,
+ TYPE_DATETIMEV2, TYPE_DATE, TYPE_DATEV2,
+ };
+ serialize_and_deserialize_arrow_test(cols, 7, true);
+ serialize_and_deserialize_arrow_test(cols, 7, false);
}
TEST(DataTypeSerDeArrowTest, DataTypeCollectionSerDeTest) {
- serialize_and_deserialize_arrow_test<false>();
+ std::vector<PrimitiveType> cols = {TYPE_ARRAY, TYPE_MAP, TYPE_STRUCT};
+ serialize_and_deserialize_arrow_test(cols, 7, true);
+ serialize_and_deserialize_arrow_test(cols, 7, false);
}
TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
- TupleDescriptor tuple_desc(PTupleDescriptor(), true);
- TSlotDescriptor tslot;
std::string col_name = "map_null_key";
- tslot.__set_colName(col_name);
- TypeDescriptor type_desc(TYPE_MAP);
- type_desc.add_sub_type(TYPE_STRING, true);
- type_desc.add_sub_type(TYPE_INT, true);
- tslot.__set_slotType(type_desc.to_thrift());
- vectorized::Block block;
+ auto block = std::make_shared<Block>();
{
DataTypePtr s =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
- ;
DataTypePtr d =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
DataTypePtr m = std::make_shared<DataTypeMap>(s, d);
Array k1, k2, v1, v2, k3, v3;
@@ -614,41 +466,14 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest)
{
map_column->insert(m2);
map_column->insert(m3);
vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(),
m, col_name);
- block.insert(type_and_name);
+ block->insert(type_and_name);
}
- tslot.__set_col_unique_id(1);
- SlotDescriptor* slot = new SlotDescriptor(tslot);
- tuple_desc.add_slot(slot);
- RowDescriptor row_desc(&tuple_desc, true);
- // arrow schema
- std::shared_ptr<arrow::Schema> _arrow_schema;
- EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"),
Status::OK());
-
- // serialize
- std::shared_ptr<arrow::RecordBatch> result;
- std::cout << "block structure: " << block.dump_structure() << std::endl;
- std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) <<
std::endl;
-
- cctz::time_zone timezone_obj;
- TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
timezone_obj);
- static_cast<void>(convert_to_arrow_batch(block, _arrow_schema,
arrow::default_memory_pool(),
- &result, timezone_obj));
- Block new_block = block.clone_empty();
- EXPECT_TRUE(result != nullptr);
- std::cout << "result: " << result->ToString() << std::endl;
- // deserialize
- auto* array = result->GetColumnByName(col_name).get();
- auto& column_with_type_and_name = new_block.get_by_name(col_name);
- static_cast<void>(arrow_column_to_doris_column(array, 0,
column_with_type_and_name.column,
-
column_with_type_and_name.type, block.rows(),
- "UTC"));
- std::cout << block.dump_data() << std::endl;
- std::cout << new_block.dump_data() << std::endl;
- // new block row_index 0, 2 which row has key null will be filter
- EXPECT_EQ(new_block.dump_one_line(0, 1), "{\"doris\":null, \"clever
amory\":30}");
- EXPECT_EQ(new_block.dump_one_line(2, 1), "{\"test\":11}");
- EXPECT_EQ(block.dump_data(1, 1), new_block.dump_data(1, 1));
+ std::shared_ptr<arrow::RecordBatch> record_batch =
+ CommonDataTypeSerdeTest::serialize_arrow(block);
+ auto assert_block = std::make_shared<Block>(block->clone_empty());
+ CommonDataTypeSerdeTest::deserialize_arrow(assert_block, record_batch);
+ CommonDataTypeSerdeTest::compare_two_blocks(block, assert_block);
}
} // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 19b21c16a45..be8c3dfd201 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -294,8 +294,10 @@ static doris::TupleDescriptor* create_tuple_desc(
t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27,
9).to_thrift());
} else {
TypeDescriptor descriptor(column_descs[i].type);
- if (column_descs[i].precision >= 0 && column_descs[i].scale >= 0) {
+ if (column_descs[i].precision >= 0) {
descriptor.precision = column_descs[i].precision;
+ }
+ if (column_descs[i].scale >= 0) {
descriptor.scale = column_descs[i].scale;
}
t_slot_desc.__set_slotType(descriptor.to_thrift());
diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp
index 4c075cba848..76982d399fb 100644
--- a/be/test/vec/exprs/vexpr_test.cpp
+++ b/be/test/vec/exprs/vexpr_test.cpp
@@ -93,8 +93,10 @@ static doris::TupleDescriptor* create_tuple_desc(
t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27,
9).to_thrift());
} else {
TypeDescriptor descriptor(column_descs[i].type);
- if (column_descs[i].precision >= 0 && column_descs[i].scale >= 0) {
+ if (column_descs[i].precision >= 0) {
descriptor.precision = column_descs[i].precision;
+ }
+ if (column_descs[i].scale >= 0) {
descriptor.scale = column_descs[i].scale;
}
t_slot_desc.__set_slotType(descriptor.to_thrift());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]