This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push:
new 553db1844da [fix] (variant) add serialize and deserialize (#45487)
553db1844da is described below
commit 553db1844da808caf4de3609a44b7b9d6bd87fc5
Author: Sun Chenyang <[email protected]>
AuthorDate: Mon Dec 16 21:37:36 2024 +0800
[fix] (variant) add serialize and deserialize (#45487)
---
.../rowset/segment_v2/hierarchical_data_reader.cpp | 2 +-
.../rowset/segment_v2/hierarchical_data_reader.h | 4 +-
be/src/vec/columns/column_object.cpp | 367 +++++++++++++--------
be/src/vec/columns/column_object.h | 38 +--
.../vec/data_types/serde/data_type_array_serde.cpp | 18 +
.../vec/data_types/serde/data_type_array_serde.h | 3 +
.../vec/data_types/serde/data_type_jsonb_serde.cpp | 12 +
.../vec/data_types/serde/data_type_jsonb_serde.h | 3 +
.../data_types/serde/data_type_nullable_serde.cpp | 15 +
.../data_types/serde/data_type_nullable_serde.h | 3 +
.../data_types/serde/data_type_number_serde.cpp | 10 +
.../vec/data_types/serde/data_type_number_serde.h | 3 +
be/src/vec/data_types/serde/data_type_serde.h | 2 +-
.../vec/data_types/serde/data_type_string_serde.h | 12 +
14 files changed, 331 insertions(+), 161 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index db6bac6b8b4..c85e4b429ad 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -206,7 +206,7 @@ Status
ExtractReader::extract_to(vectorized::MutableColumnPtr& dst, size_t nrows
""},
expected_type, &cast_column));
variant.get_root()->insert_range_from(*cast_column, 0, nrows);
- variant.set_num_rows(variant.get_root()->size());
+ // variant.set_num_rows(variant.get_root()->size());
}
if (dst->is_nullable()) {
// fill nullmap
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index f85038713ca..6c8ced89cd2 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -240,10 +240,10 @@ private:
src_null_map.clear();
assert_cast<ColumnObject&>(
assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column())
- .clear_subcolumns_data();
+ .clear_column_data();
} else {
ColumnObject& root_column =
assert_cast<ColumnObject&>(*_root_reader->column);
- root_column.clear_subcolumns_data();
+ root_column.clear_column_data();
}
} else {
if (dst->is_nullable()) {
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index c1a50f6064b..9d6e260724b 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -616,18 +616,20 @@ bool ColumnObject::Subcolumn::is_finalized() const {
}
template <typename Func>
-MutableColumnPtr ColumnObject::apply_for_subcolumns(Func&& func) const {
+MutableColumnPtr ColumnObject::apply_for_columns(Func&& func) const {
if (!is_finalized()) {
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnObject&>(*finalized);
- return finalized_object.apply_for_subcolumns(std::forward<Func>(func));
+ return finalized_object.apply_for_columns(std::forward<Func>(func));
}
auto res = ColumnObject::create(is_nullable, false);
for (const auto& subcolumn : subcolumns) {
- auto new_subcolumn = func(subcolumn->data.get_finalized_column());
+ auto new_subcolumn = func(subcolumn->data.get_finalized_column_ptr());
res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(),
subcolumn->data.get_least_common_type());
}
+ auto sparse_column = func(serialized_sparse_column);
+ res->serialized_sparse_column = sparse_column->assume_mutable();
check_consistency();
return res;
}
@@ -642,6 +644,7 @@ void ColumnObject::resize(size_t n) {
for (auto& subcolumn : subcolumns) {
subcolumn->data.pop_back(num_rows - n);
}
+ serialized_sparse_column->pop_back(num_rows - n);
}
num_rows = n;
}
@@ -809,8 +812,13 @@ ColumnObject::ColumnObject(Subcolumns&& subcolumns_, bool
is_nullable_)
check_consistency();
}
+ColumnObject::ColumnObject(size_t num_rows) : is_nullable(true) {
+ insert_many_defaults(num_rows);
+ check_consistency();
+}
+
void ColumnObject::check_consistency() const {
- if (subcolumns.empty()) {
+ if (subcolumns.empty() && serialized_sparse_column->empty()) {
return;
}
for (const auto& leaf : subcolumns) {
@@ -820,6 +828,11 @@ void ColumnObject::check_consistency() const {
leaf->path.get_path(), num_rows,
leaf->data.size());
}
}
+ if (num_rows != serialized_sparse_column->size()) {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "unmatched sparse column:, expeted rows: {},
but meet: {}", num_rows,
+ serialized_sparse_column->size());
+ }
}
size_t ColumnObject::size() const {
@@ -835,13 +848,11 @@ MutableColumnPtr ColumnObject::clone_resized(size_t
new_size) const {
}
// If subcolumns are empty, then res will be empty but new_size > 0
if (subcolumns.empty()) {
- // Add an emtpy column with new_size rows
- auto res = ColumnObject::create(true, false);
- res->set_num_rows(new_size);
+ auto res = ColumnObject::create(new_size);
return res;
}
- auto res = apply_for_subcolumns(
- [&](const auto& subcolumn) { return
subcolumn.clone_resized(new_size); });
+ auto res = apply_for_columns(
+ [&](const ColumnPtr column) { return
column->clone_resized(new_size); });
return res;
}
@@ -850,6 +861,7 @@ size_t ColumnObject::byte_size() const {
for (const auto& entry : subcolumns) {
res += entry->data.byteSize();
}
+ res += serialized_sparse_column->byte_size();
return res;
}
@@ -858,6 +870,7 @@ size_t ColumnObject::allocated_bytes() const {
for (const auto& entry : subcolumns) {
res += entry->data.allocatedBytes();
}
+ res += serialized_sparse_column->allocated_bytes();
return res;
}
@@ -940,6 +953,7 @@ void ColumnObject::insert_default() {
for (auto& entry : subcolumns) {
entry->data.insert_default();
}
+ serialized_sparse_column->insert_default();
++num_rows;
}
@@ -1000,16 +1014,18 @@ void
ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std:
// remove default
row -= num_of_defaults_in_prefix;
- is_null = false;
for (size_t i = 0; i < data.size(); ++i) {
const auto& part = data[i];
if (row < part->size()) {
- // insert key
- key->insert_data(path.data(), path.size());
- // insert value
- const auto& part_type = data_types[i];
- const auto& serde = part_type->get_serde();
- serde->write_one_cell_to_binary(*part, value, row);
+ if (assert_cast<const ColumnNullable&>(*part).is_null_at(row)) {
+ is_null = true;
+ } else {
+ is_null = false;
+ // insert key
+ key->insert_data(path.data(), path.size());
+ // insert value
+ data_types[i]->get_serde()->write_one_cell_to_binary(*part,
value, row);
+ }
return;
}
@@ -1020,6 +1036,110 @@ void
ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std:
"Index ({}) for serialize to sparse column is out
of range", row);
}
+const char* parse_binary_from_sparse_column(TypeIndex type, const char* data,
Field& res,
+ FieldInfo& info_res) {
+ const char* end = data;
+ switch (type) {
+ case TypeIndex::String: {
+ const size_t size = *reinterpret_cast<const size_t*>(data);
+ data += sizeof(size_t);
+ res = Field(String(data, size));
+ end = data + size;
+ break;
+ }
+ case TypeIndex::Int8: {
+ res = *reinterpret_cast<const Int8*>(data);
+ end = data + sizeof(Int8);
+ break;
+ }
+ case TypeIndex::Int16: {
+ res = *reinterpret_cast<const Int16*>(data);
+ end = data + sizeof(Int16);
+ break;
+ }
+ case TypeIndex::Int32: {
+ res = *reinterpret_cast<const Int32*>(data);
+ end = data + sizeof(Int32);
+ break;
+ }
+ case TypeIndex::Int64: {
+ res = *reinterpret_cast<const Int64*>(data);
+ end = data + sizeof(Int64);
+ break;
+ }
+ case TypeIndex::Float32: {
+ res = *reinterpret_cast<const Float32*>(data);
+ end = data + sizeof(Float32);
+ break;
+ }
+ case TypeIndex::Float64: {
+ res = *reinterpret_cast<const Float64*>(data);
+ end = data + sizeof(Float64);
+ break;
+ }
+ case TypeIndex::JSONB: {
+ size_t size = *reinterpret_cast<const size_t*>(data);
+ data += sizeof(size_t);
+ res = JsonbField(data, size);
+ end = data + size;
+ break;
+ }
+ case TypeIndex::Array: {
+ const size_t size = *reinterpret_cast<const size_t*>(data);
+ data += sizeof(size_t);
+ res = Array(size);
+ vectorized::Array& array = res.get<Array>();
+ info_res.num_dimensions++;
+ for (size_t i = 0; i < size; ++i) {
+ const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++);
+ if (is_null) {
+ array.emplace_back(Null());
+ continue;
+ }
+ Field nested_field;
+ const TypeIndex nested_type =
+ assert_cast<const TypeIndex>(*reinterpret_cast<const
uint8_t*>(data++));
+ data = parse_binary_from_sparse_column(nested_type, data,
nested_field, info_res);
+ array.emplace_back(std::move(nested_field));
+ }
+ end = data;
+ break;
+ }
+ default:
+ throw doris::Exception(ErrorCode::OUT_OF_BOUND,
+ "Type ({}) for deserialize_from_sparse_column
is invalid", type);
+ }
+ return end;
+}
+
+std::pair<Field, FieldInfo> ColumnObject::deserialize_from_sparse_column(const
ColumnString* value,
+
size_t row) const {
+ const auto& data_ref = value->get_data_at(row);
+ const char* data = data_ref.data;
+ DCHECK(data_ref.size > 0);
+
+ FieldInfo info_res = {
+ .scalar_type_id = TypeIndex::Nothing,
+ .have_nulls = false,
+ .need_convert = false,
+ .num_dimensions = 1,
+ };
+ // 0 is null
+ const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++);
+ if (is_null) {
+ DCHECK(data_ref.size == 1);
+ return {Null(), info_res};
+ }
+
+ DCHECK(data_ref.size > 1);
+ const TypeIndex type = assert_cast<const
TypeIndex>(*reinterpret_cast<const uint8_t*>(data++));
+ info_res.scalar_type_id = type;
+ Field res;
+ const char* end = parse_binary_from_sparse_column(type, data, res,
info_res);
+ DCHECK_EQ(end - data_ref.data, data_ref.size);
+ return {std::move(res), std::move(info_res)};
+}
+
Field ColumnObject::operator[](size_t n) const {
Field object;
get(n, object);
@@ -1043,6 +1163,18 @@ void ColumnObject::get(size_t n, Field& res) const {
object.try_emplace(entry->path.get_path(), field);
}
}
+
+ const auto& [path, value] = get_sparse_data_paths_and_values();
+ auto& sparse_column_offsets = serialized_sparse_column_offsets();
+ size_t offset = sparse_column_offsets[n - 1];
+ size_t end = sparse_column_offsets[n];
+ // Iterator over [path, binary value]
+ for (size_t i = offset; i != end; ++i) {
+ const StringRef path_data = path->get_data_at(i);
+ const auto& data = deserialize_from_sparse_column(value, i);
+ object.try_emplace(std::string(path_data.data, path_data.size),
data.first);
+ }
+
if (object.empty()) {
res = Null();
}
@@ -1159,7 +1291,7 @@ void
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
const auto& src_serialized_sparse_column_offsets =
src.serialized_sparse_column_offsets();
if (src_serialized_sparse_column_offsets[start - 1] ==
src_serialized_sparse_column_offsets[start + length - 1]) {
- size_t current_size = size();
+ size_t current_size = num_rows;
/// If no src subcolumns should be inserted into sparse column, insert
defaults.
if (sorted_src_subcolumn_for_sparse_column.empty()) {
@@ -1228,7 +1360,8 @@ void
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
const PathInData column_path(src_sparse_path);
if (auto* subcolumn = get_subcolumn(column_path); subcolumn !=
nullptr) {
// Deserialize binary value into subcolumn from src serialized
sparse column data.
-
subcolumn->deserialize_from_sparse_column(src_sparse_column_values, i);
+ const auto& data =
src.deserialize_from_sparse_column(src_sparse_column_values, i);
+ subcolumn->insert(data.first, data.second);
} else {
// Before inserting this path into sparse column check if we
need to
// insert suibcolumns from
sorted_src_subcolumn_for_sparse_column before.
@@ -1284,16 +1417,6 @@ void
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
return;
}
-ColumnPtr ColumnObject::replicate(const Offsets& offsets) const {
- if (subcolumns.empty()) {
- // Add an emtpy column with offsets.back rows
- auto res = ColumnObject::create(true, false);
- res->set_num_rows(offsets.back());
- }
- return apply_for_subcolumns(
- [&](const auto& subcolumn) { return subcolumn.replicate(offsets);
});
-}
-
ColumnPtr ColumnObject::permute(const Permutation& perm, size_t limit) const {
if (subcolumns.empty()) {
if (limit == 0) {
@@ -1306,19 +1429,17 @@ ColumnPtr ColumnObject::permute(const Permutation&
perm, size_t limit) const {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Size of permutation is less than
required.");
}
- // Add an emtpy column with limit rows
- auto res = ColumnObject::create(true, false);
- res->set_num_rows(limit);
+ auto res = ColumnObject::create(limit);
return res;
}
- return apply_for_subcolumns(
- [&](const auto& subcolumn) { return subcolumn.permute(perm,
limit); });
+ return apply_for_columns([&](const ColumnPtr column) { return
column->permute(perm, limit); });
}
void ColumnObject::pop_back(size_t length) {
for (auto& entry : subcolumns) {
entry->data.pop_back(length);
}
+ serialized_sparse_column->pop_back(length);
num_rows -= length;
}
@@ -1441,15 +1562,6 @@ bool ColumnObject::add_sub_column(const PathInData& key,
size_t new_size) {
return true;
}
-PathsInData ColumnObject::getKeys() const {
- PathsInData keys;
- keys.reserve(subcolumns.size());
- for (const auto& entry : subcolumns) {
- keys.emplace_back(entry->path);
- }
- return keys;
-}
-
bool ColumnObject::is_finalized() const {
return std::all_of(subcolumns.begin(), subcolumns.end(),
[](const auto& entry) { return
entry->data.is_finalized(); });
@@ -1704,8 +1816,6 @@ Status ColumnObject::serialize_sparse_columns(
std::map<std::string_view, Subcolumn>&& remaing_subcolumns) {
CHECK(is_finalized());
- serialized_sparse_column = ColumnMap::create(ColumnString::create(),
ColumnString::create(),
-
ColumnArray::ColumnOffsets::create());
if (remaing_subcolumns.empty()) {
serialized_sparse_column->insert_many_defaults(num_rows);
return Status::OK();
@@ -1784,39 +1894,6 @@ Status ColumnObject::finalize(FinalizeMode mode) {
new_subcolumns.get_mutable_root()->data.finalize(mode);
}
- // pick sparse columns
- std::set<std::string_view> selected_path;
- std::vector<std::string_view> remaining_path;
- if (subcolumns.size() > MAX_SUBCOLUMNS) {
- // pick subcolumns sort by size of none null values
- std::unordered_map<std::string_view, size_t> none_null_value_sizes;
- // 1. get the none null value sizes
- for (auto&& entry : subcolumns) {
- if (entry->data.is_root) {
- continue;
- }
- size_t size = entry->data.get_non_null_value_size();
- none_null_value_sizes[entry->path.get_path()] = size;
- }
- // 2. sort by the size
- std::vector<std::pair<std::string_view, size_t>> sorted_by_size(
- none_null_value_sizes.begin(), none_null_value_sizes.end());
- std::sort(sorted_by_size.begin(), sorted_by_size.end(),
- [](const auto& a, const auto& b) { return a.second >
b.second; });
-
- // 3. pick MAX_SUBCOLUMNS selected subcolumns
- for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS,
sorted_by_size.size()); ++i) {
- selected_path.insert(sorted_by_size[i].first);
- }
-
- // 4. put remaining subcolumns to remaining_subcolumns
- for (const auto& entry : sorted_by_size) {
- if (selected_path.find(entry.first) == selected_path.end()) {
- remaining_path.emplace_back(entry.first);
- }
- }
- }
-
// finalize all subcolumns
for (auto&& entry : subcolumns) {
const auto& least_common_type = entry->data.get_least_common_type();
@@ -1840,24 +1917,57 @@ Status ColumnObject::finalize(FinalizeMode mode) {
}
}
- // add selected subcolumns to new_subcolumns
- for (auto&& entry : subcolumns) {
- if (selected_path.find(entry->path.get_path()) != selected_path.end())
{
- new_subcolumns.add(entry->path, entry->data);
+ // merge and encode sparse column
+ if (mode == FinalizeMode::WRITE_MODE) {
+ // pick sparse columns
+ std::set<std::string_view> selected_path;
+ std::vector<std::string_view> remaining_path;
+ if (subcolumns.size() > MAX_SUBCOLUMNS) {
+ // pick subcolumns sort by size of none null values
+ std::unordered_map<std::string_view, size_t> none_null_value_sizes;
+ // 1. get the none null value sizes
+ for (auto&& entry : subcolumns) {
+ if (entry->data.is_root) {
+ continue;
+ }
+ size_t size = entry->data.get_non_null_value_size();
+ none_null_value_sizes[entry->path.get_path()] = size;
+ }
+ // 2. sort by the size
+ std::vector<std::pair<std::string_view, size_t>> sorted_by_size(
+ none_null_value_sizes.begin(),
none_null_value_sizes.end());
+ std::sort(sorted_by_size.begin(), sorted_by_size.end(),
+ [](const auto& a, const auto& b) { return a.second >
b.second; });
+
+ // 3. pick MAX_SUBCOLUMNS selected subcolumns
+ for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS,
sorted_by_size.size()); ++i) {
+ selected_path.insert(sorted_by_size[i].first);
+ }
+
+ // 4. put remaining subcolumns to remaining_subcolumns
+ for (const auto& entry : sorted_by_size) {
+ if (selected_path.find(entry.first) == selected_path.end()) {
+ remaining_path.emplace_back(entry.first);
+ }
+ }
+ }
+ // add selected subcolumns to new_subcolumns
+ for (auto&& entry : subcolumns) {
+ if (selected_path.find(entry->path.get_path()) !=
selected_path.end()) {
+ new_subcolumns.add(entry->path, entry->data);
+ }
}
- }
- std::map<std::string_view, Subcolumn> remaing_subcolumns;
- // merge remaining subcolumns to sparse_column
- for (auto&& entry : subcolumns) {
- if (selected_path.find(entry->path.get_path()) != selected_path.end())
{
- remaing_subcolumns.emplace(entry->path.get_path(), entry->data);
+ std::map<std::string_view, Subcolumn> remaing_subcolumns;
+ // merge remaining subcolumns to sparse_column
+ for (auto&& entry : subcolumns) {
+ if (selected_path.find(entry->path.get_path()) !=
selected_path.end()) {
+ remaing_subcolumns.emplace(entry->path.get_path(),
entry->data);
+ }
}
+
RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns)));
}
- // merge and encode sparse column
- RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns)));
-
std::swap(subcolumns, new_subcolumns);
doc_structure = nullptr;
_prev_positions.clear();
@@ -1894,6 +2004,7 @@ ColumnPtr get_base_column_of_array(const ColumnPtr&
column) {
return column;
}
+// ----
ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const {
if (!is_finalized()) {
auto finalized = clone_finalized();
@@ -1901,9 +2012,7 @@ ColumnPtr ColumnObject::filter(const Filter& filter,
ssize_t count) const {
return finalized_object.filter(filter, count);
}
if (subcolumns.empty()) {
- // Add an emtpy column with filtered rows
- auto res = ColumnObject::create(true, false);
- res->set_num_rows(count_bytes_in_filter(filter));
+ auto res = ColumnObject::create(count_bytes_in_filter(filter));
return res;
}
auto new_column = ColumnObject::create(true, false);
@@ -1912,35 +2021,17 @@ ColumnPtr ColumnObject::filter(const Filter& filter,
ssize_t count) const {
new_column->add_sub_column(entry->path, subcolumn->assume_mutable(),
entry->data.get_least_common_type());
}
+ // filter
return new_column;
}
-Status ColumnObject::filter_by_selector(const uint16_t* sel, size_t sel_size,
IColumn* col_ptr) {
- if (!is_finalized()) {
- finalize();
- }
- if (subcolumns.empty()) {
- assert_cast<ColumnObject*>(col_ptr)->insert_many_defaults(sel_size);
- return Status::OK();
- }
- auto* res = assert_cast<ColumnObject*>(col_ptr);
- for (const auto& subcolumn : subcolumns) {
- auto new_subcolumn =
subcolumn->data.get_least_common_type()->create_column();
-
RETURN_IF_ERROR(subcolumn->data.get_finalized_column().filter_by_selector(
- sel, sel_size, new_subcolumn.get()));
- res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(),
- subcolumn->data.get_least_common_type());
- }
- return Status::OK();
-}
-
size_t ColumnObject::filter(const Filter& filter) {
if (!is_finalized()) {
finalize();
}
size_t count = filter.size() -
simd::count_zero_num((int8_t*)filter.data(), filter.size());
if (count == 0) {
- for_each_subcolumn([](auto& part) { part->clear(); });
+ clear();
} else {
for_each_subcolumn([&](auto& part) {
if (part->size() != count) {
@@ -1958,6 +2049,14 @@ size_t ColumnObject::filter(const Filter& filter) {
}
}
});
+ const auto result_size = serialized_sparse_column->filter(filter);
+ if (result_size != count) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "result_size not euqal with filter_size,
result_size={}, "
+ "filter_size={}",
+ result_size, count);
+ }
+ CHECK_EQ(result_size, count);
}
num_rows = count;
#ifndef NDEBUG
@@ -1966,7 +2065,7 @@ size_t ColumnObject::filter(const Filter& filter) {
return count;
}
-void ColumnObject::clear_subcolumns_data() {
+void ColumnObject::clear_column_data() {
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
DCHECK_EQ(part->use_count(), 1);
@@ -1974,12 +2073,14 @@ void ColumnObject::clear_subcolumns_data() {
}
entry->data.num_of_defaults_in_prefix = 0;
}
+ serialized_sparse_column->clear();
num_rows = 0;
}
void ColumnObject::clear() {
Subcolumns empty;
std::swap(empty, subcolumns);
+ serialized_sparse_column->clear();
num_rows = 0;
_prev_positions.clear();
}
@@ -2063,61 +2164,53 @@ void ColumnObject::insert_indices_from(const IColumn&
src, const uint32_t* indic
}
}
-void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback
callback) const {
+template <typename Func>
+void ColumnObject::for_each_imutable_column(Func&& callback) const {
if (!is_finalized()) {
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnObject&>(*finalized);
- finalized_object.for_each_imutable_subcolumn(callback);
+ finalized_object.for_each_imutable_column(callback);
return;
}
for (const auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
- callback(*part);
+ callback(part);
}
}
-}
-
-bool ColumnObject::is_exclusive() const {
- bool is_exclusive = IColumn::is_exclusive();
- for_each_imutable_subcolumn([&](const auto& subcolumn) {
- if (!subcolumn.is_exclusive()) {
- is_exclusive = false;
- }
- });
- return is_exclusive;
+ callback(serialized_sparse_column);
}
void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const {
- for_each_imutable_subcolumn(
- [&](const auto& subcolumn) { return
subcolumn.update_hash_with_value(n, hash); });
+ for_each_imutable_column(
+ [&](const ColumnPtr column) { return
column->update_hash_with_value(n, hash); });
}
void ColumnObject::update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict
null_data) const {
- for_each_imutable_subcolumn([&](const auto& subcolumn) {
- return subcolumn.update_hashes_with_value(hashes, nullptr);
+ for_each_imutable_column([&](const ColumnPtr column) {
+ return column->update_hashes_with_value(hashes, nullptr);
});
}
void ColumnObject::update_xxHash_with_value(size_t start, size_t end,
uint64_t& hash,
const uint8_t* __restrict
null_data) const {
- for_each_imutable_subcolumn([&](const auto& subcolumn) {
- return subcolumn.update_xxHash_with_value(start, end, hash, nullptr);
+ for_each_imutable_column([&](const ColumnPtr column) {
+ return column->update_xxHash_with_value(start, end, hash, nullptr);
});
}
void ColumnObject::update_crcs_with_value(uint32_t* __restrict hash,
PrimitiveType type,
uint32_t rows, uint32_t offset,
const uint8_t* __restrict null_data)
const {
- for_each_imutable_subcolumn([&](const auto& subcolumn) {
- return subcolumn.update_crcs_with_value(hash, type, rows, offset,
nullptr);
+ for_each_imutable_column([&](const ColumnPtr column) {
+ return column->update_crcs_with_value(hash, type, rows, offset,
nullptr);
});
}
void ColumnObject::update_crc_with_value(size_t start, size_t end, uint32_t&
hash,
const uint8_t* __restrict null_data)
const {
- for_each_imutable_subcolumn([&](const auto& subcolumn) {
- return subcolumn.update_crc_with_value(start, end, hash, nullptr);
+ for_each_imutable_column([&](const ColumnPtr column) {
+ return column->update_crc_with_value(start, end, hash, nullptr);
});
}
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index 38ed5478f02..72cc783caf8 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -98,7 +98,7 @@ public:
constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB;
// Nullable(Array(Nullable(Object)))
const static DataTypePtr NESTED_TYPE;
- const size_t MAX_SUBCOLUMNS = 200;
+ const size_t MAX_SUBCOLUMNS = 3;
// Finlize mode for subcolumns, write mode will estimate which subcolumns
are sparse columns(too many null values inside column),
// merge and encode them into a shared column in root column. Only affects
in flush block to segments.
// Otherwise read mode should be as default mode.
@@ -185,9 +185,6 @@ public:
void serialize_to_sparse_column(ColumnString* key, std::string_view
path,
ColumnString* value, size_t row, bool&
is_null);
- // Deserialize the i-th row of the column from the sparse column.
- void deserialize_from_sparse_column(const ColumnString* value, size_t
row) {}
-
friend class ColumnObject;
private:
@@ -263,7 +260,8 @@ private:
// It's filled when the number of subcolumns reaches the limit.
// It has type Map(String, String) and stores a map (path, binary
serialized subcolumn value) for each row.
- WrappedPtr serialized_sparse_column;
+ WrappedPtr serialized_sparse_column = ColumnMap::create(
+ ColumnString::create(), ColumnString::create(),
ColumnArray::ColumnOffsets::create());
public:
static constexpr auto COLUMN_NAME_DUMMY = "_dummy";
@@ -272,6 +270,9 @@ public:
explicit ColumnObject(bool is_nullable_, DataTypePtr type,
MutableColumnPtr&& column);
+ // create without root, num_rows = size
+ explicit ColumnObject(size_t size);
+
ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_);
~ColumnObject() override = default;
@@ -316,8 +317,6 @@ public:
// Only single scalar root column
bool is_scalar_variant() const;
- bool is_exclusive() const override;
-
ColumnPtr get_root() const { return
subcolumns.get_root()->data.get_finalized_column_ptr(); }
bool has_subcolumn(const PathInData& key) const;
@@ -365,8 +364,6 @@ public:
return serialized_sparse_column->convert_to_full_column_if_const();
}
- PathsInData getKeys() const;
-
// use sparse_subcolumns_schema to record sparse column's path info and
type
Status finalize(FinalizeMode mode);
@@ -385,7 +382,7 @@ public:
void resize(size_t n) override;
- void clear_subcolumns_data();
+ void clear_column_data();
std::string get_name() const override {
if (is_scalar_variant()) {
@@ -416,8 +413,6 @@ public:
void insert_default() override;
- ColumnPtr replicate(const Offsets& offsets) const override;
-
void pop_back(size_t length) override;
Field operator[](size_t n) const override;
@@ -428,8 +423,6 @@ public:
ColumnPtr filter(const Filter&, ssize_t) const override;
- Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn*
col_ptr) override;
-
size_t filter(const Filter&) override;
ColumnPtr permute(const Permutation&, size_t) const override;
@@ -437,7 +430,7 @@ public:
bool is_variable_length() const override { return true; }
template <typename Func>
- MutableColumnPtr apply_for_subcolumns(Func&& func) const;
+ MutableColumnPtr apply_for_columns(Func&& func) const;
// Extract path from root column and output to dst
Status extract_root(const PathInData& path, MutableColumnPtr& dst) const;
@@ -462,6 +455,10 @@ public:
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const
override;
+ ColumnPtr replicate(const Offsets& offsets) const override {
+ throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "replicate" +
get_name());
+ }
+
Int64 get_int(size_t /*n*/) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "get_int" +
get_name());
}
@@ -529,10 +526,6 @@ public:
"deserialize_vec_with_null_map" + get_name());
}
- Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn*
col_ptr) const {
- throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"filter_by_selector" + get_name());
- }
-
bool structure_equals(const IColumn&) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"structure_equals" + get_name());
}
@@ -579,6 +572,10 @@ public:
return {&key, &value};
}
+ // Deserialize the i-th row of the column from the sparse column.
+ std::pair<Field, FieldInfo> deserialize_from_sparse_column(const
ColumnString* value,
+ size_t row)
const;
+
private:
// May throw execption
void try_insert(const Field& field);
@@ -586,7 +583,8 @@ private:
/// It's used to get shared sized of Nested to insert correct default
values.
const Subcolumns::Node* get_leaf_of_the_same_nested(const
Subcolumns::NodePtr& entry) const;
- void for_each_imutable_subcolumn(ImutableColumnCallback callback) const;
+ template <typename Func>
+ void for_each_imutable_column(Func&& callback) const;
// return null if not found
const Subcolumn* get_subcolumn_with_cache(const PathInData& key, size_t
index_hint) const;
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp
b/be/src/vec/data_types/serde/data_type_array_serde.cpp
index a56eb00dbdd..1b812781805 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp
@@ -434,5 +434,23 @@ Status DataTypeArraySerDe::read_column_from_pb(IColumn&
column, const PValues& a
}
return Status::OK();
}
+
+void DataTypeArraySerDe::write_one_cell_to_binary(const IColumn& src_column,
+ ColumnString* dst_column,
int64_t row_num) const {
+ const uint8_t type = static_cast<uint8_t>(TypeIndex::Array);
+ dst_column->insert_data(reinterpret_cast<const char*>(&type),
sizeof(uint8_t));
+
+ const auto& array_col = assert_cast<const ColumnArray&>(src_column);
+ const IColumn& nested_column = array_col.get_data();
+ const auto& offsets = array_col.get_offsets();
+ size_t start = offsets[row_num - 1];
+ size_t end = offsets[row_num];
+ size_t size = end - start;
+ dst_column->insert_data(reinterpret_cast<const char*>(&size),
sizeof(size_t));
+ for (size_t offset = start; offset != end; ++offset) {
+ nested_serde->write_one_cell_to_binary(nested_column, dst_column,
offset);
+ }
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h
b/be/src/vec/data_types/serde/data_type_array_serde.h
index 5b15f48f502..aaf1a425512 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_array_serde.h
@@ -101,6 +101,9 @@ public:
nested_serde->set_return_object_as_string(value);
}
+ void write_one_cell_to_binary(const IColumn& src_column, ColumnString*
dst_column,
+ int64_t row_num) const override;
+
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
index 10218e4164d..f56bccc298c 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
@@ -277,5 +277,17 @@ Status DataTypeJsonbSerDe::read_column_from_pb(IColumn&
column, const PValues& a
}
return Status::OK();
}
+
+void DataTypeJsonbSerDe::write_one_cell_to_binary(const IColumn& src_column,
+ ColumnString* dst_column,
int64_t row_num) const {
+ const uint8_t type = static_cast<uint8_t>(TypeIndex::JSONB);
+ const auto& col = assert_cast<const ColumnString&>(src_column);
+ const auto& data_ref = col.get_data_at(row_num);
+ const size_t size = data_ref.size;
+
+ dst_column->insert_data(reinterpret_cast<const char*>(&type),
sizeof(uint8_t));
+ dst_column->insert_data(reinterpret_cast<const char*>(&size),
sizeof(size_t));
+ dst_column->insert_data(data_ref.data, size);
+}
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.h
b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
index 5080b1ba46e..d6d29cce556 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.h
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
@@ -71,6 +71,9 @@ public:
int64_t end) const override;
Status read_column_from_pb(IColumn& column, const PValues& arg) const
override;
+ void write_one_cell_to_binary(const IColumn& src_column, ColumnString*
dst_column,
+ int64_t row_num) const override;
+
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index f21f160fb0a..d45b39c6d63 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -393,5 +393,20 @@ Status
DataTypeNullableSerDe::read_one_cell_from_json(IColumn& column,
return Status::OK();
}
+void DataTypeNullableSerDe::write_one_cell_to_binary(const IColumn& src_column,
+ ColumnString* dst_column,
+ int64_t row_num) const {
+ auto& col = assert_cast<const ColumnNullable&>(src_column);
+ uint8_t is_null = 0;
+ if (col.is_null_at(row_num)) [[unlikely]] {
+ is_null = 1;
+ dst_column->insert_data(reinterpret_cast<const char*>(is_null),
sizeof(uint8_t));
+ } else {
+ dst_column->insert_data(reinterpret_cast<const char*>(is_null),
sizeof(uint8_t));
+ auto& nested_col = col.get_nested_column();
+ nested_serde->write_one_cell_to_binary(nested_col, dst_column,
row_num);
+ }
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h
b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 6051c7f722d..51cbf54eaed 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -99,6 +99,9 @@ public:
int64_t row_num) const override;
Status read_one_cell_from_json(IColumn& column, const rapidjson::Value&
result) const override;
+ void write_one_cell_to_binary(const IColumn& src_column, ColumnString*
dst_column,
+ int64_t row_num) const override;
+
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index 9416fc9a8b3..5ba7fdf293a 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -393,6 +393,16 @@ Status DataTypeNumberSerDe<T>::write_column_to_orc(const
std::string& timezone,
return Status::OK();
}
+template <typename T>
+void DataTypeNumberSerDe<T>::write_one_cell_to_binary(const IColumn&
src_column,
+ ColumnString* dst_column,
+ int64_t row_num) const {
+ const uint8_t type = static_cast<uint8_t>(TypeId<T>::value);
+ dst_column->insert_data(reinterpret_cast<const char*>(&type),
sizeof(uint8_t));
+ const auto& data_ref = assert_cast<const
ColumnType&>(src_column).get_data_at(row_num);
+ dst_column->insert_data(data_ref.data, data_ref.size);
+}
+
/// Explicit template instantiations - to avoid code bloat in headers.
template class DataTypeNumberSerDe<UInt8>;
template class DataTypeNumberSerDe<UInt16>;
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h
b/be/src/vec/data_types/serde/data_type_number_serde.h
index 203cd9dbf46..c9073f5e868 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.h
+++ b/be/src/vec/data_types/serde/data_type_number_serde.h
@@ -107,6 +107,9 @@ public:
int64_t row_num) const override;
Status read_one_cell_from_json(IColumn& column, const rapidjson::Value&
result) const override;
+ void write_one_cell_to_binary(const IColumn& src_column, ColumnString*
dst_column,
+ int64_t row_num) const override;
+
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_serde.h
b/be/src/vec/data_types/serde/data_type_serde.h
index 5b0e8fab65e..4634afa1449 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -338,7 +338,7 @@ public:
virtual Status read_one_cell_from_json(IColumn& column, const
rapidjson::Value& result) const;
virtual void write_one_cell_to_binary(const IColumn& src_column,
ColumnString* dst_column,
- int64_t row_num) {
+ int64_t row_num) const {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_one_cell_to_binary");
}
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h
b/be/src/vec/data_types/serde/data_type_string_serde.h
index 69a8cc26171..98cf89ada1e 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.h
+++ b/be/src/vec/data_types/serde/data_type_string_serde.h
@@ -366,6 +366,18 @@ public:
return Status::OK();
}
+ void write_one_cell_to_binary(const IColumn& src_column, ColumnString*
dst_column,
+ int64_t row_num) const override {
+ const uint8_t type = static_cast<uint8_t>(TypeIndex::String);
+ const auto& col = assert_cast<const ColumnType&>(src_column);
+ const auto& data_ref = col.get_data_at(row_num);
+ const size_t size = data_ref.size;
+
+ dst_column->insert_data(reinterpret_cast<const char*>(&type),
sizeof(uint8_t));
+ dst_column->insert_data(reinterpret_cast<const char*>(&size),
sizeof(size_t));
+ dst_column->insert_data(data_ref.data, size);
+ }
+
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]