This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b638c07 [feature-wip](array-type) Support nested array insertion.
(#8305) (#8586)
b638c07 is described below
commit b638c07533c32d92c057774cca00773f69f86481
Author: Adonis Ling <[email protected]>
AuthorDate: Tue Mar 22 15:28:26 2022 +0800
[feature-wip](array-type) Support nested array insertion. (#8305) (#8586)
Please refer to #8304 .
---
be/src/exec/olap_scanner.cpp | 43 +-
be/src/olap/aggregate_func.cpp | 2 +
be/src/olap/field.h | 4 +-
be/src/olap/rowset/segment_v2/column_reader.cpp | 9 +-
be/src/olap/types.cpp | 13 +
be/src/olap/types.h | 49 +-
be/src/runtime/collection_value.cpp | 241 +++++++--
be/src/runtime/collection_value.h | 38 +-
be/src/runtime/mysql_result_writer.cpp | 17 +-
be/src/runtime/raw_value.cpp | 10 +-
be/src/runtime/row_batch.cpp | 50 +-
be/src/runtime/tuple.cpp | 129 ++---
be/test/runtime/CMakeLists.txt | 1 +
be/test/runtime/array_test.cpp | 556 +++++++++++++++++++++
.../main/java/org/apache/doris/catalog/Column.java | 6 +
15 files changed, 927 insertions(+), 241 deletions(-)
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 2c35d5b..7f29110 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -362,39 +362,16 @@ Status OlapScanner::get_batch(RuntimeState* state,
RowBatch* batch, bool* eof) {
// Copy collection slot
for (auto desc : _parent->_collection_slots) {
CollectionValue* slot =
tuple->get_collection_slot(desc->tuple_offset());
-
- TypeDescriptor item_type = desc->type().children.at(0);
- size_t item_size = item_type.get_slot_size() *
slot->length();
-
- size_t nulls_size = slot->length();
- uint8_t* data =
batch->tuple_data_pool()->allocate(item_size + nulls_size);
-
- // copy null_signs
- memory_copy(data, slot->null_signs(), nulls_size);
- memory_copy(data + nulls_size, slot->data(), item_size);
-
- slot->set_null_signs(reinterpret_cast<bool*>(data));
- slot->set_data(reinterpret_cast<char*>(data + nulls_size));
-
- if (!item_type.is_string_type()) {
- continue;
- }
-
- // when string type, copy every item
- for (int i = 0; i < slot->length(); ++i) {
- int item_offset = nulls_size + i *
item_type.get_slot_size();
- if (slot->is_null_at(i)) {
- continue;
- }
- StringValue* dst_item_v =
- reinterpret_cast<StringValue*>(data +
item_offset);
- if (dst_item_v->len != 0) {
- char* string_copy = reinterpret_cast<char*>(
-
batch->tuple_data_pool()->allocate(dst_item_v->len));
- memory_copy(string_copy, dst_item_v->ptr,
dst_item_v->len);
- dst_item_v->ptr = string_copy;
- }
- }
+ const TypeDescriptor& item_type =
desc->type().children.at(0);
+ auto pool = batch->tuple_data_pool();
+ CollectionValue::deep_copy_collection(
+ slot, item_type, [pool](int size) -> MemFootprint {
+ int64_t offset = pool->total_allocated_bytes();
+ uint8_t* data = pool->allocate(size);
+ return { offset, data };
+ },
+ false
+ );
}
// the memory allocate by mem pool has been copied,
// so we should release these memory immediately
diff --git a/be/src/olap/aggregate_func.cpp b/be/src/olap/aggregate_func.cpp
index f3c6d8b..6e844e7 100644
--- a/be/src/olap/aggregate_func.cpp
+++ b/be/src/olap/aggregate_func.cpp
@@ -119,6 +119,8 @@ AggregateFuncResolver::AggregateFuncResolver() {
OLAP_FIELD_TYPE_VARCHAR>();
add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
OLAP_FIELD_TYPE_CHAR>();
+ add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
+ OLAP_FIELD_TYPE_ARRAY>();
// Min Aggregate Function
add_aggregate_mapping<OLAP_FIELD_AGGREGATION_MIN,
OLAP_FIELD_TYPE_TINYINT>();
diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 0565955..4abcb4d 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -454,9 +454,7 @@ public:
char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
auto array_v = (CollectionValue*)cell_ptr;
- array_v->set_null_signs(reinterpret_cast<bool*>(variable_ptr +
sizeof(CollectionValue)));
- array_v->set_data(variable_ptr + sizeof(CollectionValue) +
- OLAP_ARRAY_MAX_BYTES / sizeof(char*));
+ array_v->set_null_signs(reinterpret_cast<bool*>(variable_ptr));
return variable_ptr + _length;
}
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index a0366b8..058bb79 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -57,22 +57,21 @@ Status ColumnReader::create(const ColumnReaderOptions&
opts, const ColumnMetaPB&
RETURN_IF_ERROR(ColumnReader::create(opts,
meta.children_columns(0),
meta.children_columns(0).num_rows(), path_desc,
&item_reader));
- RETURN_IF_ERROR(item_reader->init());
std::unique_ptr<ColumnReader> offset_reader;
RETURN_IF_ERROR(ColumnReader::create(opts,
meta.children_columns(1),
meta.children_columns(1).num_rows(), path_desc,
&offset_reader));
- RETURN_IF_ERROR(offset_reader->init());
std::unique_ptr<ColumnReader> null_reader;
if (meta.is_nullable()) {
RETURN_IF_ERROR(ColumnReader::create(opts,
meta.children_columns(2),
meta.children_columns(2).num_rows(), path_desc,
&null_reader));
- RETURN_IF_ERROR(null_reader->init());
}
+ // The num rows of the array reader equals to the num rows of the
length reader.
+ num_rows = meta.children_columns(1).num_rows();
std::unique_ptr<ColumnReader> array_reader(
new ColumnReader(opts, meta, num_rows, path_desc));
// array reader do not need to init
@@ -127,7 +126,9 @@ Status ColumnReader::init() {
"Bad file $0: invalid column index type $1",
_path_desc.filepath, index_meta.type()));
}
}
- if (!is_empty() && _ordinal_index_meta == nullptr) {
+ // ArrayColumnWriter writes a single empty array and flushes. In this
scenario,
+ // the item writer doesn't write any data and the corresponding ordinal
index is empty.
+ if (_ordinal_index_meta == nullptr && !is_empty()) {
return Status::Corruption(strings::Substitute(
"Bad file $0: missing ordinal index for column $1",
_path_desc.filepath, _meta.column_id()));
}
diff --git a/be/src/olap/types.cpp b/be/src/olap/types.cpp
index 920c938..69761ad 100644
--- a/be/src/olap/types.cpp
+++ b/be/src/olap/types.cpp
@@ -98,6 +98,19 @@ bool is_scalar_type(FieldType field_type) {
}
}
+bool is_olap_string_type(FieldType field_type) {
+ switch (field_type) {
+ case OLAP_FIELD_TYPE_CHAR:
+ case OLAP_FIELD_TYPE_VARCHAR:
+ case OLAP_FIELD_TYPE_HLL:
+ case OLAP_FIELD_TYPE_OBJECT:
+ case OLAP_FIELD_TYPE_STRING:
+ return true;
+ default:
+ return false;
+ }
+}
+
std::shared_ptr<const TypeInfo> get_scalar_type_info(FieldType field_type) {
return ScalarTypeInfoResolver::instance()->get_type_info(field_type);
}
diff --git a/be/src/olap/types.h b/be/src/olap/types.h
index abfe4b6..e007df9 100644
--- a/be/src/olap/types.h
+++ b/be/src/olap/types.h
@@ -44,6 +44,8 @@
namespace doris {
class TabletColumn;
+extern bool is_olap_string_type(FieldType field_type);
+
class TypeInfo {
public:
virtual ~TypeInfo() = default;
@@ -254,6 +256,11 @@ public:
auto dest_value = reinterpret_cast<CollectionValue*>(dest);
auto src_value = reinterpret_cast<const CollectionValue*>(src);
+ if (src_value->length() == 0) {
+ new (dest_value) CollectionValue(src_value->length());
+ return;
+ }
+
dest_value->set_length(src_value->length());
size_t item_size = src_value->length() * _item_size;
@@ -284,20 +291,49 @@ public:
inline void direct_copy(void* dest, const void* src) const override {
auto dest_value = reinterpret_cast<CollectionValue*>(dest);
+ // NOTICE: The address pointed by null_signs of the dest_value can NOT
be modified here.
+ auto base =
reinterpret_cast<uint8_t*>(dest_value->mutable_null_signs());
+ direct_copy(&base, dest, src);
+ }
+
+ inline void direct_copy(uint8_t** base, void* dest, const void* src) const
{
+ auto dest_value = reinterpret_cast<CollectionValue*>(dest);
auto src_value = reinterpret_cast<const CollectionValue*>(src);
+
+ auto nulls_size = src_value->has_null() ? src_value->length() : 0;
+ dest_value->set_data(src_value->length() ? (*base + nulls_size) :
nullptr);
dest_value->set_length(src_value->length());
dest_value->set_has_null(src_value->has_null());
if (src_value->has_null()) {
// direct copy null_signs
+ dest_value->set_null_signs(reinterpret_cast<bool*>(*base));
memory_copy(dest_value->mutable_null_signs(),
src_value->null_signs(),
src_value->length());
}
- // direct opy item
- for (uint32_t i = 0; i < src_value->length(); ++i) {
- if (dest_value->is_null_at(i)) continue;
-
_item_type_info->direct_copy((uint8_t*)(dest_value->mutable_data()) + i *
_item_size,
- (uint8_t*)(src_value->data()) + i *
_item_size);
+ *base += nulls_size + src_value->length() * _item_type_info->size();
+ // direct copy item
+ if (_item_type_info->type() == OLAP_FIELD_TYPE_ARRAY) {
+ for (uint32_t i = 0; i < src_value->length(); ++i) {
+ if (dest_value->is_null_at(i)) continue;
+ dynamic_cast<const ArrayTypeInfo*>(_item_type_info.get())
+ ->direct_copy(base,
(uint8_t*)(dest_value->mutable_data()) + i * _item_size,
+ (uint8_t*)(src_value->data()) + i *
_item_size);
+ }
+ } else {
+ for (uint32_t i = 0; i < src_value->length(); ++i) {
+ if (dest_value->is_null_at(i)) continue;
+ auto dest_address = (uint8_t*)(dest_value->mutable_data()) + i
* _item_size;
+ auto src_address = (uint8_t*)(src_value->data()) + i *
_item_size;
+ if (is_olap_string_type(_item_type_info->type())) {
+ auto dest_slice = reinterpret_cast<Slice*>(dest_address);
+ auto src_slice = reinterpret_cast<const
Slice*>(src_address);
+ dest_slice->data = reinterpret_cast<char*>(*base);
+ dest_slice->size = src_slice->size;
+ *base += src_slice->size;
+ }
+ _item_type_info->direct_copy(dest_address, src_address);
+ }
}
}
@@ -1075,8 +1111,7 @@ struct FieldTypeTraits<OLAP_FIELD_TYPE_VARCHAR> : public
FieldTypeTraits<OLAP_FI
case OLAP_FIELD_TYPE_DOUBLE:
case OLAP_FIELD_TYPE_DECIMAL: {
auto result = src_type->to_string(src);
- if (result.size() > variable_len)
- return OLAP_ERR_INPUT_PARAMETER_ERROR;
+ if (result.size() > variable_len) return
OLAP_ERR_INPUT_PARAMETER_ERROR;
auto slice = reinterpret_cast<Slice*>(dest);
slice->data =
reinterpret_cast<char*>(mem_pool->allocate(result.size()));
memcpy(slice->data, result.c_str(), result.size());
diff --git a/be/src/runtime/collection_value.cpp
b/be/src/runtime/collection_value.cpp
index 9b7ea7d..f738695 100644
--- a/be/src/runtime/collection_value.cpp
+++ b/be/src/runtime/collection_value.cpp
@@ -17,10 +17,24 @@
#include "runtime/collection_value.h"
+#include <functional>
+
#include "common/logging.h"
+#include "common/utils.h"
#include "exprs/anyval_util.h"
+#include "exprs/literal.h"
+#include "runtime/descriptors.h"
+#include "util//mem_util.hpp"
namespace doris {
+
+using AllocateMemFunc = std::function<uint8_t* (size_t size)>;
+static Status init_collection(
+ CollectionValue* value,
+ const AllocateMemFunc& allocate,
+ uint32_t size,
+ PrimitiveType child_type);
+
int sizeof_type(PrimitiveType type) {
switch (type) {
case TYPE_TINYINT:
@@ -32,6 +46,8 @@ int sizeof_type(PrimitiveType type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
return sizeof(StringValue);
+ case TYPE_ARRAY:
+ return sizeof(CollectionValue);
case TYPE_NULL:
return 0;
default:
@@ -50,6 +66,7 @@ Status type_check(PrimitiveType type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_NULL:
+ case TYPE_ARRAY:
break;
default:
return Status::InvalidArgument(fmt::format("Type not implemented: {}",
type));
@@ -80,73 +97,221 @@ void CollectionValue::copy_null_signs(const
CollectionValue* other) {
}
}
+size_t CollectionValue::get_byte_size(const TypeDescriptor& type) const {
+ size_t result = 0;
+ if (_length == 0) {
+ return result;
+ }
+ if (_has_null) {
+ result += _length * sizeof(bool);
+ }
+ const auto& item_type = type.children[0];
+ result += _length * item_type.get_slot_size();
+ if (item_type.is_string_type()) {
+ for (int i = 0; i < _length; ++ i) {
+ if (is_null_at(i)) {
+ continue;
+ }
+ int item_offset = i * item_type.get_slot_size();
+ StringValue* item =
reinterpret_cast<StringValue*>(((uint8_t*)_data) + item_offset);
+ result += item->len;
+ }
+ } else if (item_type.type == TYPE_ARRAY) {
+ for (int i = 0; i < _length; ++ i) {
+ if (is_null_at(i)) {
+ continue;
+ }
+ int item_offset = i * item_type.get_slot_size();
+ CollectionValue* item =
reinterpret_cast<CollectionValue*>(((uint8_t*)_data) + item_offset);
+ result += item->get_byte_size(item_type);
+ }
+ }
+ return result;
+}
+
ArrayIterator CollectionValue::iterator(PrimitiveType children_type) const {
return ArrayIterator(children_type, this);
}
Status CollectionValue::init_collection(ObjectPool* pool, uint32_t size,
PrimitiveType child_type,
- CollectionValue* val) {
- if (val == nullptr) {
+ CollectionValue* value) {
+ return doris::init_collection(value, [pool](size_t size) -> uint8_t* {
+ return pool->add_array(new uint8_t[size]);
+ },
+ size, child_type
+ );
+}
+
+static Status init_collection(
+ CollectionValue* value,
+ const AllocateMemFunc& allocate,
+ uint32_t size,
+ PrimitiveType child_type) {
+ if (value == nullptr) {
return Status::InvalidArgument("collection value is null");
}
RETURN_IF_ERROR(type_check(child_type));
if (size == 0) {
+ new (value) CollectionValue(size);
return Status::OK();
}
- val->_length = size;
- val->_null_signs = pool->add_array(new bool[size] {0});
- val->_data = pool->add_array(new uint8_t[size * sizeof_type(child_type)]);
+ value->_data = allocate(size * sizeof_type(child_type));
+ value->_length = size;
+ value->_has_null = false;
+ value->_null_signs = reinterpret_cast<bool*>(allocate(size));
+ memset(value->_null_signs, 0, size * sizeof(bool));
return Status::OK();
}
Status CollectionValue::init_collection(MemPool* pool, uint32_t size,
PrimitiveType child_type,
- CollectionValue* val) {
- if (val == nullptr) {
- return Status::InvalidArgument("collection value is null");
- }
-
- RETURN_IF_ERROR(type_check(child_type));
-
- if (size == 0) {
- return Status::OK();
- }
-
- val->_length = size;
- val->_null_signs = (bool*)pool->allocate(size * sizeof(bool));
- memset(val->_null_signs, 0, size);
+ CollectionValue* value) {
+ return doris::init_collection(value, [pool](size_t size) {
+ return pool->allocate(size);
+ },
+ size, child_type
+ );
+}
- val->_data = pool->allocate(sizeof_type(child_type) * size);
+Status CollectionValue::init_collection(FunctionContext* context, uint32_t
size,
+ PrimitiveType child_type,
CollectionValue* value) {
+ return doris::init_collection(value, [context](size_t size) {
+ return context->allocate(size);
+ },
+ size, child_type
+ );
+}
- return Status::OK();
+CollectionValue CollectionValue::from_collection_val(const CollectionVal& val)
{
+ return CollectionValue(val.data, val.length, val.has_null, val.null_signs);
}
-Status CollectionValue::init_collection(FunctionContext* context, uint32_t
size,
- PrimitiveType child_type,
CollectionValue* val) {
- if (val == nullptr) {
- return Status::InvalidArgument("collection value is null");
+// Deep copy collection.
+// NOTICE: The CollectionValue* shallow_copied_cv must be initialized by
calling memcpy function first (
+// copy data from origin collection value).
+void CollectionValue::deep_copy_collection(
+ CollectionValue* shallow_copied_cv,
+ const TypeDescriptor& item_type,
+ const GenMemFootprintFunc& gen_mem_footprint,
+ bool convert_ptrs) {
+ CollectionValue* cv = shallow_copied_cv;
+ if (cv->length() == 0) {
+ return;
}
- RETURN_IF_ERROR(type_check(child_type));
+ int coll_byte_size = cv->length() * item_type.get_slot_size();
+ int nulls_size = cv->has_null() ? cv->length() * sizeof(bool) : 0;
- if (size == 0) {
- return Status::OK();
+ MemFootprint footprint = gen_mem_footprint(coll_byte_size + nulls_size);
+ int64_t offset = footprint.first;
+ char* coll_data = reinterpret_cast<char*>(footprint.second);
+
+ // copy and assign null_signs
+ if (cv->has_null()) {
+ memory_copy(convert_to<bool*>(coll_data), cv->null_signs(),
nulls_size);
+ cv->set_null_signs(convert_to<bool*>(coll_data));
+ } else {
+ cv->set_null_signs(nullptr);
}
+ // copy and assgin data
+ memory_copy(coll_data + nulls_size, cv->data(), coll_byte_size);
+ cv->set_data(coll_data + nulls_size);
- val->_length = size;
- val->_null_signs = (bool*)context->allocate(size * sizeof(bool));
- memset(val->_null_signs, 0, size);
+ deep_copy_items_in_collection(cv, coll_data, item_type, gen_mem_footprint,
convert_ptrs);
- val->_data = context->allocate(sizeof_type(child_type) * size);
+ if (convert_ptrs) {
+ cv->set_data(convert_to<char*>(offset + nulls_size));
+ if (cv->has_null()) {
+ cv->set_null_signs(convert_to<bool*>(offset));
+ }
+ }
+}
- return Status::OK();
+// Deep copy items in collection.
+// NOTICE: The CollectionValue* shallow_copied_cv must be initialized by
calling memcpy function first (
+// copy data from origin collection value).
+void CollectionValue::deep_copy_items_in_collection(
+ CollectionValue* shallow_copied_cv,
+ char* base,
+ const TypeDescriptor& item_type,
+ const GenMemFootprintFunc& gen_mem_footprint,
+ bool convert_ptrs) {
+ int nulls_size = shallow_copied_cv->has_null() ?
shallow_copied_cv->length() : 0;
+ char* item_base = base + nulls_size;
+ if (item_type.is_string_type()) {
+ // when itemtype is string, copy every string item
+ for (int i = 0; i < shallow_copied_cv->length(); ++ i) {
+ if (shallow_copied_cv->is_null_at(i)) {
+ continue;
+ }
+ char* item_offset = item_base + i * item_type.get_slot_size();
+ StringValue* dst_item_v = convert_to<StringValue*>(item_offset);
+ if (dst_item_v->len != 0) {
+ MemFootprint footprint = gen_mem_footprint(dst_item_v->len);
+ int64_t offset = footprint.first;
+ char* string_copy = reinterpret_cast<char*>(footprint.second);
+ memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len);
+ dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) :
string_copy);
+ }
+ }
+ } else if (item_type.type == TYPE_ARRAY) {
+ for (int i = 0; i < shallow_copied_cv->length(); ++ i) {
+ if (shallow_copied_cv->is_null_at(i)) {
+ continue;
+ }
+ char* item_offset = item_base + i * item_type.get_slot_size();
+ CollectionValue* item_cv =
convert_to<CollectionValue*>(item_offset);
+ deep_copy_collection(item_cv, item_type.children[0],
gen_mem_footprint, convert_ptrs);
+ }
+ }
}
-CollectionValue CollectionValue::from_collection_val(const CollectionVal& val)
{
- return CollectionValue(val.data, val.length, val.has_null, val.null_signs);
+void CollectionValue::deserialize_collection(
+ CollectionValue* cv,
+ const char* tuple_data,
+ const TypeDescriptor& type) {
+ if (cv->length() == 0) {
+ new (cv) CollectionValue(cv->length());
+ return;
+ }
+ // assgin data and null_sign pointer position in tuple_data
+ int data_offset = convert_to<int>(cv->data());
+ cv->set_data(convert_to<char*>(tuple_data + data_offset));
+ if (cv->has_null()) {
+ int null_offset = convert_to<int>(cv->null_signs());
+ cv->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
+ }
+
+ const TypeDescriptor& item_type = type.children[0];
+ if (item_type.is_string_type()) {
+ // copy every string item
+ for (size_t i = 0; i < cv->length(); ++i) {
+ if (cv->is_null_at(i)) {
+ continue;
+ }
+
+ StringValue* dst_item_v = convert_to<StringValue*>(
+ (uint8_t*)cv->data() + i * item_type.get_slot_size());
+
+ if (dst_item_v->len != 0) {
+ int offset = convert_to<int>(dst_item_v->ptr);
+ dst_item_v->ptr = convert_to<char*>(tuple_data + offset);
+ }
+ }
+ } else if (item_type.type == TYPE_ARRAY) {
+ for (size_t i = 0; i < cv->length(); ++i) {
+ if (cv->is_null_at(i)) {
+ continue;
+ }
+
+ CollectionValue* item_cv = convert_to<CollectionValue*>(
+ (uint8_t*)cv->data() + i * item_type.get_slot_size());
+ deserialize_collection(item_cv, tuple_data, item_type);
+ }
+ }
}
Status CollectionValue::set(uint32_t i, PrimitiveType type, const AnyVal*
value) {
@@ -183,6 +348,12 @@ Status CollectionValue::set(uint32_t i, PrimitiveType
type, const AnyVal* value)
dest->ptr = (char*)src->ptr;
break;
}
+ case TYPE_ARRAY: {
+ const CollectionVal* src = reinterpret_cast<const
CollectionVal*>(value);
+ CollectionValue* dest =
reinterpret_cast<CollectionValue*>(iter.value());
+ *dest = CollectionValue::from_collection_val(*src);
+ break;
+ }
default:
DCHECK(false) << "Type not implemented: " << type;
return Status::InvalidArgument("Type not implemented");
diff --git a/be/src/runtime/collection_value.h
b/be/src/runtime/collection_value.h
index ed9f4c8..6050c78 100644
--- a/be/src/runtime/collection_value.h
+++ b/be/src/runtime/collection_value.h
@@ -27,6 +27,10 @@ namespace doris {
using doris_udf::AnyVal;
+using MemFootprint = std::pair<int64_t, uint8_t*>;
+using GenMemFootprintFunc = std::function<MemFootprint (int size)>;
+
+struct TypeDescriptor;
class ArrayIterator;
/**
@@ -69,6 +73,8 @@ public:
void copy_null_signs(const CollectionValue* other);
+ size_t get_byte_size(const TypeDescriptor& type) const;
+
ArrayIterator iterator(PrimitiveType children_type) const;
/**
@@ -81,17 +87,41 @@ public:
* init collection, will alloc (children Type's size + 1) * (children
Nums) memory
*/
static Status init_collection(ObjectPool* pool, uint32_t size,
PrimitiveType child_type,
- CollectionValue* val);
+ CollectionValue* value);
static Status init_collection(MemPool* pool, uint32_t size, PrimitiveType
child_type,
- CollectionValue* val);
+ CollectionValue* value);
static Status init_collection(FunctionContext* context, uint32_t size,
PrimitiveType child_type,
- CollectionValue* val);
+ CollectionValue* value);
static CollectionValue from_collection_val(const CollectionVal& val);
- const void* data() const { return _data; }
+ // Deep copy collection.
+ // NOTICE: The CollectionValue* shallow_copied_cv must be initialized by
calling memcpy function first (
+ // copy data from origin collection value).
+ static void deep_copy_collection(
+ CollectionValue* shallow_copied_cv,
+ const TypeDescriptor& item_type,
+ const GenMemFootprintFunc& gen_mem_footprint,
+ bool convert_ptrs);
+
+ // Deep copy items in collection.
+ // NOTICE: The CollectionValue* shallow_copied_cv must be initialized by
calling memcpy function first (
+ // copy data from origin collection value).
+ static void deep_copy_items_in_collection(
+ CollectionValue* shallow_copied_cv,
+ char* base,
+ const TypeDescriptor& item_type,
+ const GenMemFootprintFunc& gen_mem_footprint,
+ bool convert_ptrs);
+
+ static void deserialize_collection(
+ CollectionValue* cv,
+ const char* tuple_data,
+ const TypeDescriptor& type);
+
+ inline const void* data() const { return _data; }
inline bool has_null() const { return _has_null; }
inline const bool* null_signs() const { return _null_signs; }
inline void* mutable_data() { return _data; }
diff --git a/be/src/runtime/mysql_result_writer.cpp
b/be/src/runtime/mysql_result_writer.cpp
index 45c297c..1cdfdc4 100644
--- a/be/src/runtime/mysql_result_writer.cpp
+++ b/be/src/runtime/mysql_result_writer.cpp
@@ -167,10 +167,10 @@ int MysqlResultWriter::_add_row_value(int index, const
TypeDescriptor& type, voi
}
case TYPE_ARRAY: {
- auto children_type = type.children[0].type;
+ auto children_type = type.children[0];
auto array_value = (const CollectionValue*)(item);
- ArrayIterator iter = array_value->iterator(children_type);
+ ArrayIterator iter = array_value->iterator(children_type.type);
_row_buffer->open_dynamic_mode();
@@ -181,14 +181,13 @@ int MysqlResultWriter::_add_row_value(int index, const
TypeDescriptor& type, voi
if (begin != 0) {
buf_ret = _row_buffer->push_string(", ", 2);
}
-
- if (children_type == TYPE_CHAR || children_type == TYPE_VARCHAR) {
- buf_ret = _row_buffer->push_string("'", 1);
- buf_ret = _add_row_value(index, children_type, iter.value());
- buf_ret = _row_buffer->push_string("'", 1);
+ if (!iter.value()) {
+ buf_ret = _row_buffer->push_string("NULL", 4);
} else {
- if (!iter.value()) {
- buf_ret = _row_buffer->push_string("NULL", 4);
+ if (children_type == TYPE_CHAR || children_type ==
TYPE_VARCHAR) {
+ buf_ret = _row_buffer->push_string("'", 1);
+ buf_ret = _add_row_value(index, children_type,
iter.value());
+ buf_ret = _row_buffer->push_string("'", 1);
} else {
buf_ret = _add_row_value(index, children_type,
iter.value());
}
diff --git a/be/src/runtime/raw_value.cpp b/be/src/runtime/raw_value.cpp
index ba51616..6d8641e 100644
--- a/be/src/runtime/raw_value.cpp
+++ b/be/src/runtime/raw_value.cpp
@@ -319,10 +319,10 @@ void RawValue::write(const void* value, void* dst, const
TypeDescriptor& type, M
CollectionValue* val = reinterpret_cast<CollectionValue*>(dst);
if (pool != nullptr) {
- auto children_type = type.children.at(0).type;
- CollectionValue::init_collection(pool, src->size(), children_type,
val);
- ArrayIterator src_iter = src->iterator(children_type);
- ArrayIterator val_iter = val->iterator(children_type);
+ const auto& item_type = type.children[0];
+ CollectionValue::init_collection(pool, src->size(),
item_type.type, val);
+ ArrayIterator src_iter = src->iterator(item_type.type);
+ ArrayIterator val_iter = val->iterator(item_type.type);
val->set_has_null(src->has_null());
val->copy_null_signs(src);
@@ -330,7 +330,7 @@ void RawValue::write(const void* value, void* dst, const
TypeDescriptor& type, M
while (src_iter.has_next() && val_iter.has_next()) {
if (!src_iter.is_null()) {
// write children
- write(src_iter.value(), val_iter.value(), children_type,
pool);
+ write(src_iter.value(), val_iter.value(), item_type, pool);
}
src_iter.next();
val_iter.next();
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index db97eee..8134feb 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -170,34 +170,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const
PRowBatch& input_batch)
CollectionValue* array_val =
tuple->get_collection_slot(slot_collection->tuple_offset());
-
- // assgin data and null_sign pointer position in tuple_data
- int data_offset = convert_to<int>(array_val->data());
- array_val->set_data(tuple_data + data_offset);
- if (array_val->has_null()) {
- int null_offset = convert_to<int>(array_val->null_signs());
- array_val->set_null_signs(convert_to<bool*>(tuple_data +
null_offset));
- }
-
- const TypeDescriptor& item_type =
slot_collection->type().children.at(0);
- if (!item_type.is_string_type()) {
- continue;
- }
-
- // copy every string item
- for (size_t k = 0; k < array_val->length(); ++k) {
- if (array_val->is_null_at(k)) {
- continue;
- }
-
- StringValue* dst_item_v = convert_to<StringValue*>(
- (uint8_t*)array_val->data() + k *
item_type.get_slot_size());
-
- if (dst_item_v->len != 0) {
- int offset = convert_to<int>(dst_item_v->ptr);
- dst_item_v->ptr = tuple_data + offset;
- }
- }
+ CollectionValue::deserialize_collection(array_val, tuple_data,
slot_collection->type());
}
}
}
@@ -592,26 +565,7 @@ size_t RowBatch::total_byte_size() const {
// compute data null_signs size
CollectionValue* array_val =
tuple->get_collection_slot(slot_collection->tuple_offset());
- if (array_val->has_null()) {
- result += array_val->length() * sizeof(bool);
- }
-
- const TypeDescriptor& item_type =
slot_collection->type().children.at(0);
- result += array_val->length() * item_type.get_slot_size();
-
- if (!item_type.is_string_type()) {
- continue;
- }
-
- // compute string type item size
- for (int k = 0; k < array_val->length(); ++k) {
- if (array_val->is_null_at(k)) {
- continue;
- }
- StringValue* dst_item_v = convert_to<StringValue*>(
- (uint8_t*)array_val->data() + k *
item_type.get_slot_size());
- result += dst_item_v->len;
- }
+ result += array_val->get_byte_size(slot_collection->type());
}
}
}
diff --git a/be/src/runtime/tuple.cpp b/be/src/runtime/tuple.cpp
index 75ae9ce..4644426 100644
--- a/be/src/runtime/tuple.cpp
+++ b/be/src/runtime/tuple.cpp
@@ -17,6 +17,7 @@
#include "runtime/tuple.h"
+#include <functional>
#include <iomanip>
#include <iostream>
#include <sstream>
@@ -36,6 +37,12 @@
namespace doris {
+static void deep_copy_collection_slots(
+ Tuple* shallow_copied_tuple,
+ const TupleDescriptor& desc,
+ const GenMemFootprintFunc& gen_mem_footprint,
+ bool convert_ptrs);
+
int64_t Tuple::total_byte_size(const TupleDescriptor& desc) const {
int64_t result = desc.byte_size();
if (!desc.has_varlen_slots()) {
@@ -87,50 +94,33 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor&
desc, MemPool* pool, bo
}
// copy collection slot
+ deep_copy_collection_slots(dst, desc, [pool](int size) ->MemFootprint {
+ int64_t offset = pool->total_allocated_bytes();
+ uint8_t* data = pool->allocate(size);
+ return { offset, data };
+ },
+ convert_ptrs
+ );
+}
+
+// Deep copy collection slots.
+// NOTICE: The Tuple* shallow_copied_tuple must be initialized by calling
memcpy function first (
+// copy data from origin tuple).
+static void deep_copy_collection_slots(
+ Tuple* shallow_copied_tuple,
+ const TupleDescriptor& desc,
+ const GenMemFootprintFunc& gen_mem_footprint,
+ bool convert_ptrs) {
for (auto slot_desc : desc.collection_slots()) {
DCHECK(slot_desc->type().is_collection_type());
- if (dst->is_null(slot_desc->null_indicator_offset())) {
+ if (shallow_copied_tuple->is_null(slot_desc->null_indicator_offset()))
{
continue;
}
// copy collection item
- CollectionValue* cv =
dst->get_collection_slot(slot_desc->tuple_offset());
-
- const TypeDescriptor& item_type = slot_desc->type().children.at(0);
-
- int coll_byte_size = cv->length() * item_type.get_slot_size();
- int nulls_size = cv->has_null() ? cv->length() * sizeof(bool) : 0;
-
- int64_t offset = pool->total_allocated_bytes();
- char* coll_data = (char*)(pool->allocate(coll_byte_size + nulls_size));
-
- // copy data and null_signs
- memory_copy(convert_to<bool*>(coll_data), cv->null_signs(),
nulls_size);
- memory_copy(coll_data + nulls_size, cv->data(), coll_byte_size);
-
- // assgin new null_sign and data location
- if (cv->has_null()) {
- cv->set_null_signs(convert_ptrs ? convert_to<bool*>(offset) :
convert_to<bool*>(coll_data));
- }
- cv->set_data(convert_ptrs ? convert_to<char*>(offset + nulls_size) :
coll_data + nulls_size);
-
- if (!item_type.is_string_type()) {
- continue;
- }
- // when itemtype is string, copy every string item
- for (int i = 0; i < cv->length(); ++i) {
- int item_offset = nulls_size + i * item_type.get_slot_size();
- if (cv->is_null_at(i)) {
- continue;
- }
- StringValue* dst_item_v = convert_to<StringValue*>(coll_data +
item_offset);
- if (dst_item_v->len != 0) {
- int64_t offset = pool->total_allocated_bytes();
- char* string_copy = (char*)(pool->allocate(dst_item_v->len));
- memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len);
- dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) :
string_copy);
- }
- }
+ CollectionValue* cv =
shallow_copied_tuple->get_collection_slot(slot_desc->tuple_offset());
+ CollectionValue::deep_copy_collection(
+ cv, slot_desc->type().children[0], gen_mem_footprint,
convert_ptrs);
}
}
@@ -197,61 +187,14 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char**
data, int64_t* offset,
}
// copy collection slots
- for (auto slot_desc : desc.collection_slots()) {
- DCHECK(slot_desc->type().is_collection_type());
- if (dst->is_null(slot_desc->null_indicator_offset())) {
- continue;
- }
- // get cv to copy elements
- CollectionValue* cv =
dst->get_collection_slot(slot_desc->tuple_offset());
- const TypeDescriptor& item_type = slot_desc->type().children.at(0);
-
- int coll_byte_size = cv->length() * item_type.get_slot_size();
- int nulls_size = cv->has_null() ? cv->length() * sizeof(bool) : 0;
-
- // copy null_sign
- memory_copy(*data, cv->null_signs(), nulls_size);
- // copy data
- memory_copy(*data + nulls_size, cv->data(), coll_byte_size);
-
- if (!item_type.is_string_type()) {
- if (cv->has_null()) {
- cv->set_null_signs(convert_ptrs ? convert_to<bool*>(*offset) :
convert_to<bool*>(*data));
- }
- cv->set_data(convert_ptrs ? convert_to<char*>(*offset + nulls_size)
- : *data + nulls_size);
- *data += coll_byte_size + nulls_size;
- *offset += coll_byte_size + nulls_size;
- continue;
- }
-
- // when item is string type, copy every item
- char* base_data = *data;
- int64_t base_offset = *offset;
-
- *data += coll_byte_size + nulls_size;
- *offset += coll_byte_size + nulls_size;
-
- for (int i = 0; i < cv->length(); ++i) {
- int item_offset = nulls_size + i * item_type.get_slot_size();
- if (cv->is_null_at(i)) {
- continue;
- }
- StringValue* dst_item_v = convert_to<StringValue*>(base_data +
item_offset);
- if (dst_item_v->len != 0) {
- memory_copy(*data, dst_item_v->ptr, dst_item_v->len);
- dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) :
*data);
- *data += dst_item_v->len;
- *offset += dst_item_v->len;
- }
- }
- // assgin new null_sign and data location
- if (cv->has_null()) {
- cv->set_null_signs(convert_ptrs ? convert_to<bool*>(base_offset) :
convert_to<bool*>(base_data));
- }
- cv->set_data(convert_ptrs ? convert_to<char*>(base_offset + nulls_size)
- : base_data + nulls_size);
- }
+ deep_copy_collection_slots(dst, desc, [offset, data](int size) ->
MemFootprint {
+ MemFootprint footprint = { *offset,
reinterpret_cast<uint8_t*>(*data) };
+ *offset += size;
+ *data += size;
+ return footprint;
+ },
+ convert_ptrs
+ );
}
template <bool collect_string_vals>
diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt
index 28eff33..06a1214 100644
--- a/be/test/runtime/CMakeLists.txt
+++ b/be/test/runtime/CMakeLists.txt
@@ -64,3 +64,4 @@ ADD_BE_TEST(memory/system_allocator_test)
ADD_BE_TEST(cache/partition_cache_test)
ADD_BE_TEST(collection_value_test)
#ADD_BE_TEST(minidump_test)
+ADD_BE_TEST(array_test)
diff --git a/be/test/runtime/array_test.cpp b/be/test/runtime/array_test.cpp
new file mode 100644
index 0000000..57a5cdb
--- /dev/null
+++ b/be/test/runtime/array_test.cpp
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+#include <rapidjson/rapidjson.h>
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "exprs/anyval_util.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "gen_cpp/segment_v2.pb.h"
+#include "olap/field.h"
+#include "olap/fs/block_manager.h"
+#include "olap/fs/fs_util.h"
+#include "olap/rowset/segment_v2/column_reader.h"
+#include "olap/rowset/segment_v2/column_writer.h"
+#include "olap/tablet_schema.h"
+#include "olap/types.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/raw_value.h"
+#include "testutil/desc_tbl_builder.h"
+#include "util/file_utils.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+template <typename... Ts>
+ColumnPB create_column_pb(const std::string& type, const Ts&...
sub_column_types) {
+ ColumnPB column;
+ column.set_type(type);
+ column.set_aggregation("NONE");
+ column.set_is_nullable(true);
+ if (type == "ARRAY") {
+ column.set_length(OLAP_ARRAY_MAX_BYTES);
+ }
+ if constexpr (sizeof...(sub_column_types) > 0) {
+ auto sub_column = create_column_pb(sub_column_types...);
+ column.add_children_columns()->Swap(&sub_column);
+ }
+ return column;
+}
+
+std::shared_ptr<const TypeInfo> get_type_info(const ColumnPB& column_pb) {
+ TabletColumn tablet_column;
+ tablet_column.init_from_pb(column_pb);
+ return get_type_info(&tablet_column);
+}
+
+std::unique_ptr<Field> create_field(const ColumnPB& column_pb) {
+ TabletColumn column;
+ column.init_from_pb(column_pb);
+ return std::unique_ptr<Field>(FieldFactory::create(column));
+}
+
+TypeDescriptor get_scalar_type_desc(const TypeInfo* type_info) {
+ switch (type_info->type()) {
+ case OLAP_FIELD_TYPE_INT:
+ return TypeDescriptor(TYPE_INT);
+ case OLAP_FIELD_TYPE_VARCHAR:
+ return
TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
+ default:
+ return TypeDescriptor();
+ }
+}
+
+TupleDescriptor* get_tuple_descriptor(ObjectPool& object_pool, const TypeInfo*
type_info) {
+ DescriptorTblBuilder builder(&object_pool);
+ auto& tuple_desc_builder = builder.declare_tuple();
+ if (type_info->type() == OLAP_FIELD_TYPE_ARRAY) {
+ TypeDescriptor type_desc(TYPE_ARRAY);
+ type_desc.len = OLAP_ARRAY_MAX_BYTES;
+ auto ptype = dynamic_cast<const
ArrayTypeInfo*>(type_info)->item_type_info().get();
+ while (ptype->type() == OLAP_FIELD_TYPE_ARRAY) {
+ type_desc.children.push_back(TypeDescriptor(TYPE_ARRAY));
+ ptype = dynamic_cast<const
ArrayTypeInfo*>(ptype)->item_type_info().get();
+ }
+ type_desc.children.push_back(get_scalar_type_desc(ptype));
+ tuple_desc_builder << type_desc;
+ } else {
+ tuple_desc_builder << get_scalar_type_desc(type_info);
+ }
+ return builder.build()->get_tuple_descriptor(0);
+}
+
+CollectionValue* parse(ObjectPool& object_pool,
+ const
rapidjson::GenericValue<rapidjson::UTF8<>>::ConstArray& json_array,
+ const TypeDescriptor& type_desc) {
+ if (json_array.Empty()) {
+ return object_pool.add(new CollectionValue(0));
+ } else {
+ auto array = object_pool.add(new CollectionValue());
+ const auto& item_type_desc = type_desc.children[0];
+ CollectionValue::init_collection(&object_pool, json_array.Size(),
item_type_desc.type,
+ array);
+ int index = 0;
+ switch (item_type_desc.type) {
+ case TYPE_ARRAY:
+ for (auto it = json_array.Begin(); it != json_array.End(); ++it) {
+ auto val = CollectionVal();
+ if (it->IsNull()) {
+ val.is_null = true;
+ } else {
+ auto sub_array = parse(object_pool, it->GetArray(),
item_type_desc);
+ sub_array->to_collection_val(&val);
+ }
+ array->set(index++, item_type_desc.type, &val);
+ }
+ break;
+ case TYPE_INT:
+ for (auto it = json_array.Begin(); it != json_array.End(); ++it) {
+ auto val = it->IsNull() ? IntVal::null() :
IntVal(it->GetInt());
+ array->set(index++, item_type_desc.type, &val);
+ }
+ break;
+ case TYPE_VARCHAR:
+ for (auto it = json_array.Begin(); it != json_array.End(); ++it) {
+ if (it->IsNull()) {
+ auto val = StringVal::null();
+ array->set(index++, item_type_desc.type, &val);
+ } else {
+ char* string = object_pool.add_array(new
char[it->GetStringLength()]);
+ memcpy(string, it->GetString(), it->GetStringLength());
+ auto val = StringVal(reinterpret_cast<uint8_t*>(string),
it->GetStringLength());
+ array->set(index++, item_type_desc.type, &val);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ if (!array->has_null()) {
+ array->set_null_signs(nullptr);
+ }
+ return array;
+ }
+}
+
+CollectionValue* parse(ObjectPool& object_pool, const std::string& text,
+ const TypeDescriptor& type_desc) {
+ rapidjson::Document document;
+ if (document.Parse(text.c_str()).HasParseError() || !document.IsArray()) {
+ return nullptr;
+ }
+ return parse(object_pool, (const_cast<const
rapidjson::Document*>(&document))->GetArray(),
+ type_desc);
+}
+
+void validate(const Field* field, const CollectionValue* expect, const
CollectionValue* actual,
+ bool check_nullptr) {
+ EXPECT_TRUE(field->type_info()->equal(expect, actual));
+ if (check_nullptr) {
+ if (expect->length() == 0) {
+ EXPECT_EQ(nullptr, actual->data());
+ EXPECT_EQ(expect->data(), actual->data());
+ }
+ if (!expect->has_null()) {
+ EXPECT_EQ(nullptr, expect->null_signs());
+ EXPECT_EQ(expect->null_signs(), actual->null_signs());
+ }
+ }
+}
+
+void validate(const Field* field, const CollectionValue* expect, const
CollectionValue* actual) {
+ validate(field, expect, actual, true);
+}
+
+class ArrayTest : public ::testing::Test {
+public:
+ ArrayTest()
+ : _mem_tracker(new MemTracker(MAX_MEMORY_BYTES, "ArrayTest")),
+ _mem_pool(new MemPool(_mem_tracker.get())) {}
+
+protected:
+ void SetUp() override {
+ if (FileUtils::check_exist(TEST_DIR)) {
+ ASSERT_TRUE(FileUtils::remove_all(TEST_DIR).ok());
+ }
+ ASSERT_TRUE(FileUtils::create_dir(TEST_DIR).ok());
+ }
+
+ void TearDown() override {
+ if (FileUtils::check_exist(TEST_DIR)) {
+ ASSERT_TRUE(FileUtils::remove_all(TEST_DIR).ok());
+ }
+ }
+
+private:
+ void test_copy_array(const TupleDescriptor* tuple_desc, const Field* field,
+ const CollectionValue* array) {
+ auto slot_desc = tuple_desc->slots().front();
+ auto type_desc = slot_desc->type();
+ auto total_size = tuple_desc->byte_size() +
array->get_byte_size(type_desc);
+
+ auto src = allocate_tuple(total_size);
+ ASSERT_NE(src, nullptr);
+
+ RawValue::write(array, src, slot_desc, _mem_pool.get());
+ auto src_cv =
reinterpret_cast<CollectionValue*>(src->get_slot(slot_desc->tuple_offset()));
+ validate(field, array, src_cv);
+
+ auto dst = allocate_tuple(total_size);
+ ASSERT_NE(dst, nullptr);
+
+ src->deep_copy(dst, *tuple_desc, _mem_pool.get());
+ auto dst_cv =
reinterpret_cast<CollectionValue*>(dst->get_slot(slot_desc->tuple_offset()));
+ validate(field, src_cv, dst_cv);
+
+ dst->init(total_size);
+ int64_t offset = 0;
+ char* serialized_data = reinterpret_cast<char*>(dst);
+ src->deep_copy(*tuple_desc, &serialized_data, &offset, true);
+ EXPECT_EQ(total_size, offset);
+ EXPECT_EQ(total_size, serialized_data - reinterpret_cast<char*>(dst));
+ dst_cv =
reinterpret_cast<CollectionValue*>(dst->get_slot(slot_desc->tuple_offset()));
+ CollectionValue::deserialize_collection(dst_cv,
reinterpret_cast<char*>(dst), type_desc);
+ validate(field, src_cv, dst_cv);
+ }
+
+ Tuple* allocate_tuple(size_t size) {
+ auto tuple = reinterpret_cast<Tuple*>(_mem_pool->allocate(size));
+ if (tuple) {
+ tuple->init(size);
+ }
+ return tuple;
+ }
+
+ void test_direct_copy_array(const Field* field,
+ const std::vector<const CollectionValue*>&
arrays) {
+ CollectionValue cell;
+ std::unique_ptr<char[]> variable_ptr(new char[field->length()]);
+ field->allocate_memory(reinterpret_cast<char*>(&cell),
variable_ptr.get());
+ EXPECT_EQ(cell.null_signs(),
reinterpret_cast<bool*>(variable_ptr.get()));
+ for (auto array : arrays) {
+ field->type_info()->direct_copy(&cell, array);
+ EXPECT_EQ(cell.null_signs(),
reinterpret_cast<bool*>(variable_ptr.get()));
+ validate(field, array, &cell, false);
+ }
+ }
+
+ template <segment_v2::EncodingTypePB array_encoding,
segment_v2::EncodingTypePB item_encoding>
+ void test_write_and_read_column(const ColumnPB& column_pb, const Field*
field,
+ const std::vector<const CollectionValue*>&
arrays) {
+ const std::string path = TEST_DIR + "/" + generate_uuid_string();
+ LOG(INFO) << "Test directory: " << path;
+ segment_v2::ColumnMetaPB meta;
+ init_column_meta<array_encoding, item_encoding>(&meta, column_pb);
+ {
+ auto wblock = create_writable_block(path);
+ ASSERT_NE(wblock, nullptr);
+ auto writer = create_column_writer<array_encoding,
item_encoding>(wblock.get(), meta,
+
column_pb);
+ ASSERT_NE(writer, nullptr);
+ Status st;
+ for (auto array : arrays) {
+ st = writer->append(false,
const_cast<CollectionValue*>(array));
+ ASSERT_TRUE(st.ok());
+ }
+ ASSERT_TRUE(writer->finish().ok());
+ ASSERT_TRUE(writer->write_data().ok());
+ ASSERT_TRUE(writer->write_ordinal_index().ok());
+ ASSERT_TRUE(writer->write_zone_map().ok());
+
+ ASSERT_TRUE(wblock->close().ok());
+ }
+ {
+ auto reader = create_column_reader(path, meta, arrays.size());
+ ASSERT_NE(reader, nullptr);
+ auto rblock = create_readable_block(path);
+ ASSERT_NE(rblock, nullptr);
+ OlapReaderStatistics stats;
+ std::unique_ptr<segment_v2::ColumnIterator> iter(
+ new_iterator(rblock.get(), &stats, reader.get()));
+ ASSERT_NE(iter, nullptr);
+ auto st = iter->seek_to_first();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ auto tracker = std::make_shared<MemTracker>();
+ MemPool pool(tracker.get());
+ std::unique_ptr<ColumnVectorBatch> cvb;
+ ColumnVectorBatch::create(0, true, field->type_info(),
const_cast<Field*>(field), &cvb);
+ ASSERT_NE(cvb, nullptr) << st.to_string();
+ cvb->resize(1024);
+ ColumnBlock col(cvb.get(), &pool);
+
+ int index = 0;
+ size_t rows_read = 1024;
+ do {
+ ColumnBlockView dst(&col);
+ st = iter->next_batch(&rows_read, &dst);
+ ASSERT_TRUE(st.ok());
+ for (int i = 0; i < rows_read; ++i) {
+ validate(field, arrays[index++],
+ reinterpret_cast<const
CollectionValue*>(col.cell_ptr(i)), false);
+ }
+ ASSERT_TRUE(st.ok());
+ } while (rows_read >= 1024);
+ }
+ }
+ template <segment_v2::EncodingTypePB array_encoding,
segment_v2::EncodingTypePB item_encoding>
+ void init_column_meta(segment_v2::ColumnMetaPB* meta, const ColumnPB&
column_pb) {
+ int column_id = 0;
+ TabletColumn column;
+ column.init_from_pb(column_pb);
+ init_column_meta<array_encoding, item_encoding>(meta, &column_id,
column);
+ }
+
+ template <segment_v2::EncodingTypePB array_encoding,
segment_v2::EncodingTypePB item_encoding>
+ void init_column_meta(segment_v2::ColumnMetaPB* meta, int* column_id,
+ const TabletColumn& column) {
+ meta->set_column_id(*column_id);
+ meta->set_unique_id((*column_id)++);
+ meta->set_type(column.type());
+ meta->set_length(column.length());
+ if (column.type() == OLAP_FIELD_TYPE_ARRAY) {
+ meta->set_encoding(array_encoding);
+ } else {
+ meta->set_encoding(item_encoding);
+ }
+ meta->set_compression(segment_v2::LZ4F);
+ meta->set_is_nullable(true);
+ for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
+ init_column_meta<array_encoding,
item_encoding>(meta->add_children_columns(), column_id,
+
column.get_sub_column(i));
+ }
+ }
+
+ std::unique_ptr<fs::WritableBlock> create_writable_block(const
std::string& path) {
+ std::unique_ptr<fs::WritableBlock> wblock;
+ fs::CreateBlockOptions fs_opts(path);
+ auto st =
fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(fs_opts, &wblock);
+ return st.ok() ? std::move(wblock) : nullptr;
+ }
+
+ template <segment_v2::EncodingTypePB array_encoding,
segment_v2::EncodingTypePB item_encoding>
+ std::unique_ptr<segment_v2::ColumnWriter>
create_column_writer(fs::WritableBlock* wblock,
+
segment_v2::ColumnMetaPB& meta,
+ const
ColumnPB& column_pb) {
+ segment_v2::ColumnWriterOptions writer_opts = {.meta = &meta};
+ TabletColumn column;
+ column.init_from_pb(column_pb);
+ std::unique_ptr<segment_v2::ColumnWriter> writer;
+ auto st = segment_v2::ColumnWriter::create(writer_opts, &column,
wblock, &writer);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ st = writer->init();
+ return st.ok() ? std::move(writer) : nullptr;
+ }
+
+ std::unique_ptr<segment_v2::ColumnReader> create_column_reader(
+ const std::string& path, const segment_v2::ColumnMetaPB& meta,
size_t num_rows) {
+ segment_v2::ColumnReaderOptions reader_opts;
+ FilePathDesc path_desc;
+ path_desc.filepath = path;
+ std::unique_ptr<segment_v2::ColumnReader> reader;
+ auto st = segment_v2::ColumnReader::create(reader_opts, meta,
num_rows, path_desc, &reader);
+ return st.ok() ? std::move(reader) : nullptr;
+ }
+
+ std::unique_ptr<fs::ReadableBlock> create_readable_block(const
std::string& path) {
+ std::unique_ptr<fs::ReadableBlock> rblock;
+ FilePathDesc path_desc;
+ path_desc.filepath = path;
+ auto block_manager = fs::fs_util::block_manager(TStorageMedium::HDD);
+ auto st = block_manager->open_block(path_desc, &rblock);
+ return st.ok() ? std::move(rblock) : nullptr;
+ }
+
+ segment_v2::ColumnIterator* new_iterator(fs::ReadableBlock* rblock,
OlapReaderStatistics* stats,
+ segment_v2::ColumnReader* reader)
{
+ segment_v2::ColumnIterator* iter = nullptr;
+ auto st = reader->new_iterator(&iter);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ segment_v2::ColumnIteratorOptions iter_opts;
+ iter_opts.stats = stats;
+ iter_opts.rblock = rblock;
+ iter_opts.mem_tracker = std::make_shared<MemTracker>();
+ st = iter->init(iter_opts);
+ return st.ok() ? iter : nullptr;
+ }
+
+ template <segment_v2::EncodingTypePB array_encoding,
segment_v2::EncodingTypePB item_encoding>
+ void test_array(const ColumnPB& column_pb, const Field* field,
+ const TupleDescriptor* tuple_desc, const CollectionValue*
array) {
+ test_copy_array(tuple_desc, field, array);
+ test_direct_copy_array(field, {array});
+ test_write_and_read_column<array_encoding, item_encoding>(column_pb,
field, {array});
+ }
+
+private:
+ static constexpr size_t MAX_MEMORY_BYTES = 1024 * 1024;
+ static const std::string TEST_DIR;
+ std::unique_ptr<MemTracker> _mem_tracker;
+ std::unique_ptr<MemPool> _mem_pool;
+ ObjectPool _object_pool;
+};
+
+const std::string ArrayTest::TEST_DIR = "./ut_dir/array_test";
+
+TEST_F(ArrayTest, TestSimpleIntArrays) {
+ auto column_pb = create_column_pb("ARRAY", "INT");
+ auto type_info = get_type_info(column_pb);
+ auto field = create_field(column_pb);
+ auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
+ ASSERT_EQ(tuple_desc->slots().size(), 1);
+ auto type_desc = tuple_desc->slots().front()->type();
+
+ std::vector<const CollectionValue*> arrays = {
+ parse(_object_pool, "[]", type_desc),
+ parse(_object_pool, "[null]", type_desc),
+ parse(_object_pool, "[1, 2, 3]", type_desc),
+ parse(_object_pool, "[1, null, 3]", type_desc),
+ parse(_object_pool, "[1, null, null]", type_desc),
+ parse(_object_pool, "[null, null, 3]", type_desc),
+ parse(_object_pool, "[null, null, null]", type_desc),
+ };
+ for (auto array : arrays) {
+ test_array<segment_v2::DEFAULT_ENCODING,
segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
+
tuple_desc, array);
+ }
+ test_direct_copy_array(field.get(), arrays);
+ test_write_and_read_column<segment_v2::DEFAULT_ENCODING,
segment_v2::BIT_SHUFFLE>(
+ column_pb, field.get(), arrays);
+}
+
+TEST_F(ArrayTest, TestNestedIntArrays) {
+ // depth 2
+ auto column_pb = create_column_pb("ARRAY", "ARRAY", "INT");
+ auto type_info = get_type_info(column_pb);
+ auto field = create_field(column_pb);
+ auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
+ ASSERT_EQ(tuple_desc->slots().size(), 1);
+ auto type_desc = tuple_desc->slots().front()->type();
+
+ std::vector<const CollectionValue*> arrays = {
+ parse(_object_pool, "[]", type_desc),
+ parse(_object_pool, "[[]]", type_desc),
+ parse(_object_pool, "[[1, 2, 3], [4, 5, 6]]", type_desc),
+ parse(_object_pool, "[[1, 2, 3], null, [4, 5, 6]]", type_desc),
+ parse(_object_pool, "[[1, 2, null], null, [4, null, 6], null,
[null, 8, 9]]",
+ type_desc),
+ };
+ for (auto array : arrays) {
+ test_array<segment_v2::DEFAULT_ENCODING,
segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
+
tuple_desc, array);
+ }
+ test_direct_copy_array(field.get(), arrays);
+ test_write_and_read_column<segment_v2::DEFAULT_ENCODING,
segment_v2::BIT_SHUFFLE>(
+ column_pb, field.get(), arrays);
+
+ // depth 3
+ column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", "INT");
+ type_info = get_type_info(column_pb);
+ field = create_field(column_pb);
+ tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
+ ASSERT_EQ(tuple_desc->slots().size(), 1);
+ type_desc = tuple_desc->slots().front()->type();
+ arrays.clear();
+ ASSERT_EQ(arrays.size(), 0);
+
+ arrays = {
+ parse(_object_pool, "[]", type_desc),
+ parse(_object_pool, "[[]]", type_desc),
+ parse(_object_pool, "[[[]]]", type_desc),
+ parse(_object_pool, "[[[null]], [[1], [2, 3]], [[4, 5, 6], null,
null]]", type_desc),
+ };
+ for (auto array : arrays) {
+ test_array<segment_v2::DEFAULT_ENCODING,
segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
+
tuple_desc, array);
+ }
+ test_direct_copy_array(field.get(), arrays);
+ test_write_and_read_column<segment_v2::DEFAULT_ENCODING,
segment_v2::BIT_SHUFFLE>(
+ column_pb, field.get(), arrays);
+}
+
+TEST_F(ArrayTest, TestSimpleStringArrays) {
+ auto column_pb = create_column_pb("ARRAY", "VARCHAR");
+ auto type_info = get_type_info(column_pb);
+ auto field = create_field(column_pb);
+ auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
+ ASSERT_EQ(tuple_desc->slots().size(), 1);
+ auto type_desc = tuple_desc->slots().front()->type();
+
+ std::vector<const CollectionValue*> arrays = {
+ parse(_object_pool, "[]", type_desc),
+ parse(_object_pool, "[null]", type_desc),
+ parse(_object_pool, "[\"a\", \"b\", \"c\"]", type_desc),
+ parse(_object_pool, "[null, \"b\", \"c\"]", type_desc),
+ parse(_object_pool, "[\"a\", null, \"c\"]", type_desc),
+ parse(_object_pool, "[\"a\", \"b\", null]", type_desc),
+ parse(_object_pool, "[null, \"b\", null]", type_desc),
+ parse(_object_pool, "[null, null, null]", type_desc),
+ };
+ for (auto array : arrays) {
+ test_array<segment_v2::DEFAULT_ENCODING,
segment_v2::DICT_ENCODING>(column_pb, field.get(),
+
tuple_desc, array);
+ }
+ test_direct_copy_array(field.get(), arrays);
+ test_write_and_read_column<segment_v2::DEFAULT_ENCODING,
segment_v2::DICT_ENCODING>(
+ column_pb, field.get(), arrays);
+}
+
+TEST_F(ArrayTest, TestNestedStringArrays) {
+ auto column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", "VARCHAR");
+ auto type_info = get_type_info(column_pb);
+ auto field = create_field(column_pb);
+ auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
+ ASSERT_EQ(tuple_desc->slots().size(), 1);
+ auto type_desc = tuple_desc->slots().front()->type();
+
+ std::vector<const CollectionValue*> arrays = {
+ parse(_object_pool, "[]", type_desc),
+ parse(_object_pool, "[[]]", type_desc),
+ parse(_object_pool, "[[[]]]", type_desc),
+ parse(_object_pool, "[null, [null], [[null]]]", type_desc),
+ parse(_object_pool, "[[[\"a\", null, \"c\"], [\"d\", \"e\",
\"f\"]], null, [[\"g\"]]]",
+ type_desc),
+ };
+ for (auto array : arrays) {
+ test_array<segment_v2::DEFAULT_ENCODING,
segment_v2::DICT_ENCODING>(column_pb, field.get(),
+
tuple_desc, array);
+ }
+ test_direct_copy_array(field.get(), arrays);
+ test_write_and_read_column<segment_v2::DEFAULT_ENCODING,
segment_v2::DICT_ENCODING>(
+ column_pb, field.get(), arrays);
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index eb45de6..2ab61c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -373,6 +373,12 @@ public class Column implements Writable {
childrenTColumnType.setIndexLen(children.getOlapColumnIndexSize());
childrenTColumn.setColumnType(childrenTColumnType);
childrenTColumn.setIsAllowNull(children.isAllowNull());
+ // TODO: If we don't set the aggregate type for children, the type
will be
+ // considered as TAggregationType::SUM after deserializing in BE.
+ // For now, we make children inherit the aggregate type from
their parent.
+ if (tColumn.getAggregationType() != null) {
+
childrenTColumn.setAggregationType(tColumn.getAggregationType());
+ }
tColumn.setChildrenColumn(new ArrayList<>());
tColumn.children_column.add(childrenTColumn);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]