This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch feat-nested in repository https://gitbox.apache.org/repos/asf/doris.git
commit 262474b836b023c1bb8f43d2dc24c9f6d3844f52 Author: eldenmoon <[email protected]> AuthorDate: Fri Jan 9 20:12:44 2026 +0800 Merge NestedGroup JSONB back into whole Variant reads. When reading the whole VARIANT column, reconstruct NestedGroup arrays as JSONB subcolumns and attach them to the output ColumnVariant so data remains visible after discarding JSONB subcolumns during write/compaction. --- .../segment_v2/variant/variant_column_reader.cpp | 397 ++++++++++++++++++++- 1 file changed, 396 insertions(+), 1 deletion(-) diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp index b44d85f0c00..f3f923a5efe 100644 --- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp @@ -167,6 +167,53 @@ private: return Status::OK(); } + // English comment: build a JSONB blob column (Nullable(JSONB)) for rows [row_ord, row_ord+row_cnt). + // This is used for nested groups inside an element object. + Status _append_group_as_jsonb_blob(GroupState& state, ordinal_t row_ord, size_t row_cnt, + vectorized::MutableColumnPtr& dst) { + auto* dst_nullable = assert_cast<vectorized::ColumnNullable*>(dst.get()); + auto& dst_str = assert_cast<vectorized::ColumnString&>(dst_nullable->get_nested_column()); + auto& dst_nullmap = dst_nullable->get_null_map_column().get_data(); + + uint64_t start_off = 0; + std::vector<uint64_t> offsets; + RETURN_IF_ERROR(_read_offsets_with_prev(state.offsets_iter.get(), row_ord, row_cnt, &start_off, + &offsets)); + uint64_t end_off = offsets.empty() ? start_off : offsets.back(); + auto elem_count = static_cast<size_t>(end_off - start_off); + + RETURN_IF_ERROR(_seek_child_iters(state, start_off)); + + vectorized::MutableColumnPtr elem_obj = vectorized::ColumnVariant::create(0, elem_count); + auto* elem_obj_ptr = assert_cast<vectorized::ColumnVariant*>(elem_obj.get()); + RETURN_IF_ERROR( + _fill_elem_object_with_scalar_children(state, start_off, elem_count, elem_obj_ptr)); + RETURN_IF_ERROR(_fill_elem_object_with_nested_groups(state, start_off, elem_count, elem_obj_ptr)); + + for (size_t r = 0; r < offsets.size(); ++r) { + uint64_t off = offsets[r]; + uint64_t prev = (r == 0) ? start_off : offsets[r - 1]; + + std::string json; + json.push_back('['); + for (uint64_t i = prev; i < off; ++i) { + if (i > prev) { + json.push_back(','); + } + std::string obj; + elem_obj_ptr->serialize_one_row_to_string(static_cast<int64_t>(i), &obj); + json.append(obj); + } + json.push_back(']'); + + doris::JsonBinaryValue jsonb_value; + RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); + dst_str.insert_data(jsonb_value.value(), jsonb_value.size()); + dst_nullmap.push_back(0); + } + return Status::OK(); + } + // English comment: build a Variant(root_type=JSONB) column for rows [row_ord, row_ord+row_cnt) // and append into dst (expected Nullable(Variant) or Variant). Status _append_group_as_jsonb(GroupState& state, ordinal_t row_ord, size_t row_cnt, @@ -232,7 +279,7 @@ private: auto nested_mut = nested_nullable->assume_mutable(); if (elem_count > 0) { size_t tmp_n = elem_count; - RETURN_IF_ERROR(_append_group_as_jsonb(*nested_state, start_off, tmp_n, nested_mut)); + RETURN_IF_ERROR(_append_group_as_jsonb_blob(*nested_state, start_off, tmp_n, nested_mut)); } else { nested_mut->insert_many_defaults(elem_count); } @@ -296,6 +343,336 @@ private: ordinal_t _current_ordinal = 0; }; +// Iterator for reading the whole NestedGroup as Nullable(JSONB) (for merging into ColumnVariant). +class NestedGroupWholeJsonbIterator : public ColumnIterator { +public: + explicit NestedGroupWholeJsonbIterator(const NestedGroupReader* group_reader) + : _group_reader(group_reader) {} + + Status init(const ColumnIteratorOptions& opts) override { + DCHECK(_group_reader && _group_reader->is_valid()); + _iter_opts = opts; + RETURN_IF_ERROR(_build_group_state(_root_state, _group_reader)); + return Status::OK(); + } + + Status seek_to_ordinal(ordinal_t ord_idx) override { + _current_ordinal = ord_idx; + RETURN_IF_ERROR(_seek_group_state(_root_state, ord_idx)); + return Status::OK(); + } + + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { + RETURN_IF_ERROR(_append_group_as_jsonb_blob(_root_state, _current_ordinal, *n, dst)); + _current_ordinal += *n; + *has_null = false; + return Status::OK(); + } + + Status read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) override { + bool has_null = false; + for (size_t i = 0; i < count; ++i) { + RETURN_IF_ERROR(seek_to_ordinal(rowids[i])); + size_t one = 1; + RETURN_IF_ERROR(next_batch(&one, dst, &has_null)); + } + return Status::OK(); + } + + ordinal_t get_current_ordinal() const override { return _current_ordinal; } + +private: + struct GroupState { + const NestedGroupReader* reader = nullptr; + ColumnIteratorUPtr offsets_iter; + std::unordered_map<std::string, ColumnIteratorUPtr> child_iters; + std::unordered_map<std::string, std::unique_ptr<GroupState>> nested_groups; + }; + + Status _build_group_state(GroupState& state, const NestedGroupReader* reader) { + state.reader = reader; + RETURN_IF_ERROR(reader->offsets_reader->new_iterator(&state.offsets_iter, nullptr)); + RETURN_IF_ERROR(state.offsets_iter->init(_iter_opts)); + for (const auto& [name, cr] : reader->child_readers) { + ColumnIteratorUPtr it; + RETURN_IF_ERROR(cr->new_iterator(&it, nullptr)); + RETURN_IF_ERROR(it->init(_iter_opts)); + state.child_iters.emplace(name, std::move(it)); + } + for (const auto& [name, nested] : reader->nested_group_readers) { + auto st = std::make_unique<GroupState>(); + RETURN_IF_ERROR(_build_group_state(*st, nested.get())); + state.nested_groups.emplace(name, std::move(st)); + } + return Status::OK(); + } + + Status _seek_group_state(GroupState& state, ordinal_t row_ord) { + RETURN_IF_ERROR(state.offsets_iter->seek_to_ordinal(row_ord)); + uint64_t start_off = 0; + if (row_ord > 0) { + std::vector<uint64_t> prev; + RETURN_IF_ERROR(_read_offsets_with_prev(state.offsets_iter.get(), row_ord, 1, &start_off, + &prev)); + if (!prev.empty()) { + start_off = prev[0]; + } + } + for (auto& [_, it] : state.child_iters) { + RETURN_IF_ERROR(it->seek_to_ordinal(start_off)); + } + for (auto& [_, nested] : state.nested_groups) { + RETURN_IF_ERROR(_seek_group_state(*nested, start_off)); + } + return Status::OK(); + } + + Status _seek_child_iters(GroupState& state, uint64_t start_off) { + for (auto& [_, it] : state.child_iters) { + RETURN_IF_ERROR(it->seek_to_ordinal(start_off)); + } + return Status::OK(); + } + + Status _fill_elem_object_with_scalar_children(GroupState& state, uint64_t start_off, size_t elem_count, + vectorized::ColumnVariant* elem_obj_ptr) { + std::unordered_map<std::string, vectorized::MutableColumnPtr> child_cols; + child_cols.reserve(state.child_iters.size()); + for (auto& [name, it] : state.child_iters) { + auto type = state.reader->child_readers.at(name)->get_vec_data_type(); + auto col = type->create_column(); + if (elem_count > 0) { + size_t to_read = elem_count; + bool child_has_null = false; + RETURN_IF_ERROR(it->next_batch(&to_read, col, &child_has_null)); + } + child_cols.emplace(name, std::move(col)); + } + for (auto& [name, col] : child_cols) { + vectorized::PathInData p(name); + bool ok = elem_obj_ptr->add_sub_column( + p, col->assume_mutable(), + state.reader->child_readers.at(name)->get_vec_data_type()); + if (!ok) { + return Status::InternalError("Duplicated NestedGroup child field {}", name); + } + } + return Status::OK(); + } + + Status _fill_elem_object_with_nested_groups(GroupState& state, uint64_t start_off, size_t elem_count, + vectorized::ColumnVariant* elem_obj_ptr) { + for (auto& [name, nested_state] : state.nested_groups) { + auto nested_jsonb = vectorized::ColumnString::create(); + auto nested_nullable = vectorized::ColumnNullable::create( + std::move(nested_jsonb), vectorized::ColumnUInt8::create()); + auto nested_mut = nested_nullable->assume_mutable(); + if (elem_count > 0) { + size_t tmp_n = elem_count; + RETURN_IF_ERROR(_append_group_as_jsonb_blob(*nested_state, start_off, tmp_n, nested_mut)); + } else { + nested_mut->insert_many_defaults(elem_count); + } + vectorized::PathInData p(name); + bool ok = elem_obj_ptr->add_sub_column( + p, std::move(nested_mut), + vectorized::make_nullable(std::make_shared<vectorized::DataTypeJsonb>())); + if (!ok) { + return Status::InternalError("Duplicated NestedGroup nested field {}", name); + } + } + return Status::OK(); + } + + // NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size) + Status _append_group_as_jsonb_blob(GroupState& state, ordinal_t row_ord, size_t row_cnt, + vectorized::MutableColumnPtr& dst) { + auto* dst_nullable = assert_cast<vectorized::ColumnNullable*>(dst.get()); + auto& dst_str = assert_cast<vectorized::ColumnString&>(dst_nullable->get_nested_column()); + auto& dst_nullmap = dst_nullable->get_null_map_column().get_data(); + + uint64_t start_off = 0; + std::vector<uint64_t> offsets; + RETURN_IF_ERROR(_read_offsets_with_prev(state.offsets_iter.get(), row_ord, row_cnt, &start_off, + &offsets)); + uint64_t end_off = offsets.empty() ? start_off : offsets.back(); + auto elem_count = static_cast<size_t>(end_off - start_off); + + RETURN_IF_ERROR(_seek_child_iters(state, start_off)); + + vectorized::MutableColumnPtr elem_obj = vectorized::ColumnVariant::create(0, elem_count); + auto* elem_obj_ptr = assert_cast<vectorized::ColumnVariant*>(elem_obj.get()); + RETURN_IF_ERROR( + _fill_elem_object_with_scalar_children(state, start_off, elem_count, elem_obj_ptr)); + RETURN_IF_ERROR(_fill_elem_object_with_nested_groups(state, start_off, elem_count, elem_obj_ptr)); + + for (size_t r = 0; r < offsets.size(); ++r) { + uint64_t off = offsets[r]; + uint64_t prev = (r == 0) ? start_off : offsets[r - 1]; + + std::string json; + json.push_back('['); + for (uint64_t i = prev; i < off; ++i) { + if (i > prev) { + json.push_back(','); + } + std::string obj; + elem_obj_ptr->serialize_one_row_to_string(static_cast<int64_t>(i), &obj); + json.append(obj); + } + json.push_back(']'); + + doris::JsonBinaryValue jsonb_value; + RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); + dst_str.insert_data(jsonb_value.value(), jsonb_value.size()); + dst_nullmap.push_back(0); + } + return Status::OK(); + } + + const NestedGroupReader* _group_reader; + ColumnIteratorOptions _iter_opts; + GroupState _root_state; + ordinal_t _current_ordinal = 0; +}; + +} // namespace + +namespace { + +// English comment: when reading the whole Variant column, merge top-level NestedGroups back +// into ColumnVariant as subcolumns (path = array path). This enables discarding JSONB +// subcolumns physically while keeping query results correct. +class VariantRootWithNestedGroupsIterator : public ColumnIterator { +public: + VariantRootWithNestedGroupsIterator(ColumnIteratorUPtr root_iter, + const NestedGroupReaders* nested_group_readers) + : _root_iter(std::move(root_iter)) { + if (!nested_group_readers) { + return; + } + for (const auto& [path, reader] : *nested_group_readers) { + if (reader && reader->is_valid()) { + _nested_group_readers.emplace_back(path, reader.get()); + } + } + } + + Status init(const ColumnIteratorOptions& opts) override { + RETURN_IF_ERROR(_root_iter->init(opts)); + _nested_group_iters.clear(); + _nested_group_iters.reserve(_nested_group_readers.size()); + for (const auto& [path, reader] : _nested_group_readers) { + auto it = std::make_unique<NestedGroupWholeJsonbIterator>(reader); + RETURN_IF_ERROR(it->init(opts)); + _nested_group_iters.emplace_back(path, std::move(it)); + } + return Status::OK(); + } + + Status seek_to_ordinal(ordinal_t ord_idx) override { + RETURN_IF_ERROR(_root_iter->seek_to_ordinal(ord_idx)); + for (auto& [_, it] : _nested_group_iters) { + RETURN_IF_ERROR(it->seek_to_ordinal(ord_idx)); + } + return Status::OK(); + } + + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { + auto& obj = + dst->is_nullable() + ? assert_cast<vectorized::ColumnVariant&>( + assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column()) + : assert_cast<vectorized::ColumnVariant&>(*dst); + auto most_common_type = obj.get_most_common_type(); // NOLINT(readability-static-accessed-through-instance) + + auto root_column = most_common_type->create_column(); + RETURN_IF_ERROR(_root_iter->next_batch(n, root_column, has_null)); + + // Fill dst nullmap from root. + if (root_column->is_nullable() && dst->is_nullable()) { + auto& dst_null_map = + assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column(); + auto& src_null_map = + assert_cast<vectorized::ColumnNullable&>(*root_column).get_null_map_column(); + dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size()); + } + + // Build a tmp ColumnVariant for this batch: root + nested-group subcolumns. + auto tmp = vectorized::ColumnVariant::create(0, root_column->size()); + auto& tmp_obj = assert_cast<vectorized::ColumnVariant&>(*tmp); + tmp_obj.add_sub_column({}, std::move(root_column), most_common_type); + + auto ng_type = vectorized::make_nullable(std::make_shared<vectorized::DataTypeJsonb>()); + for (auto& [path, it] : _nested_group_iters) { + auto col = ng_type->create_column(); + size_t m = *n; + bool tmp_has_null = false; + RETURN_IF_ERROR(it->next_batch(&m, col, &tmp_has_null)); + tmp_obj.add_sub_column(vectorized::PathInData(path), std::move(col), ng_type); + } + + obj.insert_range_from(*tmp, 0, tmp_obj.rows()); + if (!obj.is_finalized()) { + obj.finalize(); + } +#ifndef NDEBUG + obj.check_consistency(); +#endif + *has_null = false; + return Status::OK(); + } + + Status read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) override { + auto& obj = + dst->is_nullable() + ? assert_cast<vectorized::ColumnVariant&>( + assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column()) + : assert_cast<vectorized::ColumnVariant&>(*dst); + auto most_common_type = obj.get_most_common_type(); // NOLINT(readability-static-accessed-through-instance) + + auto root_column = most_common_type->create_column(); + RETURN_IF_ERROR(_root_iter->read_by_rowids(rowids, count, root_column)); + + if (root_column->is_nullable() && dst->is_nullable()) { + auto& dst_null_map = + assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column(); + auto& src_null_map = + assert_cast<vectorized::ColumnNullable&>(*root_column).get_null_map_column(); + dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size()); + } + + auto tmp = vectorized::ColumnVariant::create(0, root_column->size()); + auto& tmp_obj = assert_cast<vectorized::ColumnVariant&>(*tmp); + tmp_obj.add_sub_column({}, std::move(root_column), most_common_type); + + auto ng_type = vectorized::make_nullable(std::make_shared<vectorized::DataTypeJsonb>()); + for (auto& [path, it] : _nested_group_iters) { + auto col = ng_type->create_column(); + RETURN_IF_ERROR(it->read_by_rowids(rowids, count, col)); + tmp_obj.add_sub_column(vectorized::PathInData(path), std::move(col), ng_type); + } + + obj.insert_range_from(*tmp, 0, tmp_obj.rows()); + if (!obj.is_finalized()) { + obj.finalize(); + } +#ifndef NDEBUG + obj.check_consistency(); +#endif + return Status::OK(); + } + + ordinal_t get_current_ordinal() const override { return _root_iter->get_current_ordinal(); } + +private: + ColumnIteratorUPtr _root_iter; + std::vector<std::pair<std::string, const NestedGroupReader*>> _nested_group_readers; + std::vector<std::pair<std::string, std::unique_ptr<NestedGroupWholeJsonbIterator>>> _nested_group_iters; +}; + } // namespace const SubcolumnColumnMetaInfo::Node* VariantColumnReader::get_subcolumn_meta_by_path( @@ -739,6 +1116,12 @@ Status VariantColumnReader::_new_iterator_with_flat_leaves( return Status::OK(); } if (relative_path.empty()) { + // root path: merge NestedGroup JSONB back as subcolumns if present + if (!_nested_group_readers.empty()) { + *iterator = std::make_unique<VariantRootWithNestedGroupsIterator>( + std::make_unique<FileColumnIterator>(_root_column_reader), &_nested_group_readers); + return Status::OK(); + } // root path, use VariantRootColumnIterator *iterator = std::make_unique<VariantRootColumnIterator>( std::make_unique<FileColumnIterator>(_root_column_reader)); @@ -897,6 +1280,18 @@ Status VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator, column_reader_cache, sparse_column_cache_ptr); } + // Whole Variant read: merge NestedGroup JSONB back as subcolumns if present. + if (relative_path.empty()) { + if (!_nested_group_readers.empty()) { + *iterator = std::make_unique<VariantRootWithNestedGroupsIterator>( + std::make_unique<FileColumnIterator>(_root_column_reader), &_nested_group_readers); + return Status::OK(); + } + *iterator = std::make_unique<VariantRootColumnIterator>( + std::make_unique<FileColumnIterator>(_root_column_reader)); + return Status::OK(); + } + // Check if path is prefix, example sparse columns path: a.b.c, a.b.e, access prefix: a.b. // Or access root path if (has_prefix_path(relative_path)) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
