This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch feat-nested in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9a3aa5432b4492ea312ef504bc5dd8cd9ca3cc2a Author: eldenmoon <[email protected]> AuthorDate: Fri Jan 9 20:35:41 2026 +0800 Refactor NestedGroup whole iterator output paths. Use a single NestedGroupWholeIterator for both Variant(JSONB) and JSONB blob outputs by selecting output format based on destination column type. Also reuse JsonBinaryValue across rows to reduce allocations. --- .../segment_v2/variant/variant_column_reader.cpp | 219 ++------------------- 1 file changed, 20 insertions(+), 199 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp index f3f923a5efe..5d209718590 100644 --- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp @@ -101,7 +101,7 @@ public: } Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { - RETURN_IF_ERROR(_append_group_as_jsonb(_root_state, _current_ordinal, *n, dst)); + RETURN_IF_ERROR(_append_group_as_any(_root_state, _current_ordinal, *n, dst)); _current_ordinal += *n; *has_null = false; return Status::OK(); @@ -167,6 +167,21 @@ private: return Status::OK(); } + // English comment: output selector: + // - If dst is Nullable(JSONB) (ColumnNullable(ColumnString)), append JSONB blobs. + // - Otherwise append Nullable(Variant(root_type=JSONB)). + Status _append_group_as_any(GroupState& state, ordinal_t row_ord, size_t row_cnt, + vectorized::MutableColumnPtr& dst) { + if (auto* dst_nullable = vectorized::check_and_get_column<vectorized::ColumnNullable>(dst.get()); + dst_nullable != nullptr) { + if (vectorized::check_and_get_column<vectorized::ColumnString>( + &dst_nullable->get_nested_column()) != nullptr) { + return _append_group_as_jsonb_blob(state, row_ord, row_cnt, dst); + } + } + return _append_group_as_jsonb(state, row_ord, row_cnt, dst); + } + // English comment: build a JSONB blob column (Nullable(JSONB)) for rows [row_ord, row_ord+row_cnt). // This is used for nested groups inside an element object. Status _append_group_as_jsonb_blob(GroupState& state, ordinal_t row_ord, size_t row_cnt, @@ -190,6 +205,7 @@ private: _fill_elem_object_with_scalar_children(state, start_off, elem_count, elem_obj_ptr)); RETURN_IF_ERROR(_fill_elem_object_with_nested_groups(state, start_off, elem_count, elem_obj_ptr)); + doris::JsonBinaryValue jsonb_value; for (size_t r = 0; r < offsets.size(); ++r) { uint64_t off = offsets[r]; uint64_t prev = (r == 0) ? start_off : offsets[r - 1]; @@ -206,7 +222,6 @@ private: } json.push_back(']'); - doris::JsonBinaryValue jsonb_value; RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); dst_str.insert_data(jsonb_value.value(), jsonb_value.size()); dst_nullmap.push_back(0); @@ -303,6 +318,7 @@ private: dst_nullable ? assert_cast<vectorized::ColumnVariant&>(dst_nullable->get_nested_column()) : assert_cast<vectorized::ColumnVariant&>(*dst); + doris::JsonBinaryValue jsonb_value; for (size_t r = 0; r < offsets.size(); ++r) { uint64_t off = offsets[r]; uint64_t prev = (r == 0) ? start_off : offsets[r - 1]; @@ -319,7 +335,6 @@ private: } json.push_back(']'); - doris::JsonBinaryValue jsonb_value; RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); vectorized::Field jsonb_field = vectorized::Field::create_field<TYPE_JSONB>( vectorized::JsonbField(jsonb_value.value(), jsonb_value.size())); @@ -343,200 +358,6 @@ private: ordinal_t _current_ordinal = 0; }; -// Iterator for reading the whole NestedGroup as Nullable(JSONB) (for merging into ColumnVariant). -class NestedGroupWholeJsonbIterator : public ColumnIterator { -public: - explicit NestedGroupWholeJsonbIterator(const NestedGroupReader* group_reader) - : _group_reader(group_reader) {} - - Status init(const ColumnIteratorOptions& opts) override { - DCHECK(_group_reader && _group_reader->is_valid()); - _iter_opts = opts; - RETURN_IF_ERROR(_build_group_state(_root_state, _group_reader)); - return Status::OK(); - } - - Status seek_to_ordinal(ordinal_t ord_idx) override { - _current_ordinal = ord_idx; - RETURN_IF_ERROR(_seek_group_state(_root_state, ord_idx)); - return Status::OK(); - } - - Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { - RETURN_IF_ERROR(_append_group_as_jsonb_blob(_root_state, _current_ordinal, *n, dst)); - _current_ordinal += *n; - *has_null = false; - return Status::OK(); - } - - Status read_by_rowids(const rowid_t* rowids, const size_t count, - vectorized::MutableColumnPtr& dst) override { - bool has_null = false; - for (size_t i = 0; i < count; ++i) { - RETURN_IF_ERROR(seek_to_ordinal(rowids[i])); - size_t one = 1; - RETURN_IF_ERROR(next_batch(&one, dst, &has_null)); - } - return Status::OK(); - } - - ordinal_t get_current_ordinal() const override { return _current_ordinal; } - -private: - struct GroupState { - const NestedGroupReader* reader = nullptr; - ColumnIteratorUPtr offsets_iter; - std::unordered_map<std::string, ColumnIteratorUPtr> child_iters; - std::unordered_map<std::string, std::unique_ptr<GroupState>> nested_groups; - }; - - Status _build_group_state(GroupState& state, const NestedGroupReader* reader) { - state.reader = reader; - RETURN_IF_ERROR(reader->offsets_reader->new_iterator(&state.offsets_iter, nullptr)); - RETURN_IF_ERROR(state.offsets_iter->init(_iter_opts)); - for (const auto& [name, cr] : reader->child_readers) { - ColumnIteratorUPtr it; - RETURN_IF_ERROR(cr->new_iterator(&it, nullptr)); - RETURN_IF_ERROR(it->init(_iter_opts)); - state.child_iters.emplace(name, std::move(it)); - } - for (const auto& [name, nested] : reader->nested_group_readers) { - auto st = std::make_unique<GroupState>(); - RETURN_IF_ERROR(_build_group_state(*st, nested.get())); - state.nested_groups.emplace(name, std::move(st)); - } - return Status::OK(); - } - - Status _seek_group_state(GroupState& state, ordinal_t row_ord) { - RETURN_IF_ERROR(state.offsets_iter->seek_to_ordinal(row_ord)); - uint64_t start_off = 0; - if (row_ord > 0) { - std::vector<uint64_t> prev; - RETURN_IF_ERROR(_read_offsets_with_prev(state.offsets_iter.get(), row_ord, 1, &start_off, - &prev)); - if (!prev.empty()) { - start_off = prev[0]; - } - } - for (auto& [_, it] : state.child_iters) { - RETURN_IF_ERROR(it->seek_to_ordinal(start_off)); - } - for (auto& [_, nested] : state.nested_groups) { - RETURN_IF_ERROR(_seek_group_state(*nested, start_off)); - } - return Status::OK(); - } - - Status _seek_child_iters(GroupState& state, uint64_t start_off) { - for (auto& [_, it] : state.child_iters) { - RETURN_IF_ERROR(it->seek_to_ordinal(start_off)); - } - return Status::OK(); - } - - Status _fill_elem_object_with_scalar_children(GroupState& state, uint64_t start_off, size_t elem_count, - vectorized::ColumnVariant* elem_obj_ptr) { - std::unordered_map<std::string, vectorized::MutableColumnPtr> child_cols; - child_cols.reserve(state.child_iters.size()); - for (auto& [name, it] : state.child_iters) { - auto type = state.reader->child_readers.at(name)->get_vec_data_type(); - auto col = type->create_column(); - if (elem_count > 0) { - size_t to_read = elem_count; - bool child_has_null = false; - RETURN_IF_ERROR(it->next_batch(&to_read, col, &child_has_null)); - } - child_cols.emplace(name, std::move(col)); - } - for (auto& [name, col] : child_cols) { - vectorized::PathInData p(name); - bool ok = elem_obj_ptr->add_sub_column( - p, col->assume_mutable(), - state.reader->child_readers.at(name)->get_vec_data_type()); - if (!ok) { - return Status::InternalError("Duplicated NestedGroup child field {}", name); - } - } - return Status::OK(); - } - - Status _fill_elem_object_with_nested_groups(GroupState& state, uint64_t start_off, size_t elem_count, - vectorized::ColumnVariant* elem_obj_ptr) { - for (auto& [name, nested_state] : state.nested_groups) { - auto nested_jsonb = vectorized::ColumnString::create(); - auto nested_nullable = vectorized::ColumnNullable::create( - std::move(nested_jsonb), vectorized::ColumnUInt8::create()); - auto nested_mut = nested_nullable->assume_mutable(); - if (elem_count > 0) { - size_t tmp_n = elem_count; - RETURN_IF_ERROR(_append_group_as_jsonb_blob(*nested_state, start_off, tmp_n, nested_mut)); - } else { - nested_mut->insert_many_defaults(elem_count); - } - vectorized::PathInData p(name); - bool ok = elem_obj_ptr->add_sub_column( - p, std::move(nested_mut), - vectorized::make_nullable(std::make_shared<vectorized::DataTypeJsonb>())); - if (!ok) { - return Status::InternalError("Duplicated NestedGroup nested field {}", name); - } - } - return Status::OK(); - } - - // NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size) - Status _append_group_as_jsonb_blob(GroupState& state, ordinal_t row_ord, size_t row_cnt, - vectorized::MutableColumnPtr& dst) { - auto* dst_nullable = assert_cast<vectorized::ColumnNullable*>(dst.get()); - auto& dst_str = assert_cast<vectorized::ColumnString&>(dst_nullable->get_nested_column()); - auto& dst_nullmap = dst_nullable->get_null_map_column().get_data(); - - uint64_t start_off = 0; - std::vector<uint64_t> offsets; - RETURN_IF_ERROR(_read_offsets_with_prev(state.offsets_iter.get(), row_ord, row_cnt, &start_off, - &offsets)); - uint64_t end_off = offsets.empty() ? start_off : offsets.back(); - auto elem_count = static_cast<size_t>(end_off - start_off); - - RETURN_IF_ERROR(_seek_child_iters(state, start_off)); - - vectorized::MutableColumnPtr elem_obj = vectorized::ColumnVariant::create(0, elem_count); - auto* elem_obj_ptr = assert_cast<vectorized::ColumnVariant*>(elem_obj.get()); - RETURN_IF_ERROR( - _fill_elem_object_with_scalar_children(state, start_off, elem_count, elem_obj_ptr)); - RETURN_IF_ERROR(_fill_elem_object_with_nested_groups(state, start_off, elem_count, elem_obj_ptr)); - - for (size_t r = 0; r < offsets.size(); ++r) { - uint64_t off = offsets[r]; - uint64_t prev = (r == 0) ? start_off : offsets[r - 1]; - - std::string json; - json.push_back('['); - for (uint64_t i = prev; i < off; ++i) { - if (i > prev) { - json.push_back(','); - } - std::string obj; - elem_obj_ptr->serialize_one_row_to_string(static_cast<int64_t>(i), &obj); - json.append(obj); - } - json.push_back(']'); - - doris::JsonBinaryValue jsonb_value; - RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); - dst_str.insert_data(jsonb_value.value(), jsonb_value.size()); - dst_nullmap.push_back(0); - } - return Status::OK(); - } - - const NestedGroupReader* _group_reader; - ColumnIteratorOptions _iter_opts; - GroupState _root_state; - ordinal_t _current_ordinal = 0; -}; - } // namespace namespace { @@ -564,7 +385,7 @@ public: _nested_group_iters.clear(); _nested_group_iters.reserve(_nested_group_readers.size()); for (const auto& [path, reader] : _nested_group_readers) { - auto it = std::make_unique<NestedGroupWholeJsonbIterator>(reader); + auto it = std::make_unique<NestedGroupWholeIterator>(reader); RETURN_IF_ERROR(it->init(opts)); _nested_group_iters.emplace_back(path, std::move(it)); } @@ -670,7 +491,7 @@ public: private: ColumnIteratorUPtr _root_iter; std::vector<std::pair<std::string, const NestedGroupReader*>> _nested_group_readers; - std::vector<std::pair<std::string, std::unique_ptr<NestedGroupWholeJsonbIterator>>> _nested_group_iters; + std::vector<std::pair<std::string, std::unique_ptr<NestedGroupWholeIterator>>> _nested_group_iters; }; } // namespace --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
