This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch feat-nested
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 50b5c54f929fc61b739445d1ecf43491fdb7f63d
Author: eldenmoon <[email protected]>
AuthorDate: Fri Jan 9 18:40:49 2026 +0800

    Variant nested first commit
---
 be/src/common/config.cpp                           |   4 +
 be/src/common/config.h                             |   3 +
 be/src/olap/rowset/segment_creator.cpp             |  27 +-
 .../segment_v2/variant/variant_column_reader.cpp   | 620 ++++++++++++++++++++-
 .../segment_v2/variant/variant_column_reader.h     |  69 +++
 .../variant/variant_column_writer_impl.cpp         | 146 +++++
 .../variant/variant_column_writer_impl.h           |  14 +
 .../rowset/segment_v2/variant/variant_statistics.h |  18 +-
 be/src/vec/columns/column_variant.cpp              | 263 ++++++++-
 be/src/vec/columns/column_variant.h                | 114 ++++
 be/src/vec/json/json_parser.cpp                    |  30 +-
 be/src/vec/json/parse2column.cpp                   | 187 ++++---
 be/src/vec/json/path_in_data.h                     |   8 +
 be/src/vec/json/variant_nested_builder.cpp         | 349 ++++++++++++
 be/src/vec/json/variant_nested_builder.h           | 121 ++++
 gensrc/proto/segment_v2.proto                      |  22 +
 16 files changed, 1865 insertions(+), 130 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8c0f62a8ab0..18112f80972 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1128,6 +1128,10 @@ DEFINE_mBool(variant_use_cloud_schema_dict_cache, 
"true");
 DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
 DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
 DEFINE_mBool(enable_vertical_compact_variant_subcolumns, "true");
+// Maximum depth of nested arrays to track with NestedGroup
+// Reserved for future use when NestedGroup expansion moves to storage layer
+// Deeper arrays will be stored as JSONB
+DEFINE_mInt32(variant_nested_group_max_depth, "3");
 
 // block file cache
 DEFINE_Bool(enable_file_cache, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c4fa8f82cc8..d11855f1863 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1337,6 +1337,9 @@ 
DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
 DECLARE_mBool(variant_throw_exeception_on_invalid_json);
 // Enable vertical compact subcolumns of variant column
 DECLARE_mBool(enable_vertical_compact_variant_subcolumns);
+// Maximum depth of nested arrays to track with NestedGroup
+// Reserved for future use when NestedGroup expansion moves to storage layer
+DECLARE_mInt32(variant_nested_group_max_depth);
 
 DECLARE_mBool(enable_merge_on_write_correctness_check);
 // USED FOR DEBUGING
diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index 489f24728cc..13046c71d02 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -18,7 +18,7 @@
 #include "olap/rowset/segment_creator.h"
 
 // IWYU pragma: no_include <bthread/errno.h>
-#include <errno.h> // IWYU pragma: keep
+#include <cerrno> // IWYU pragma: keep
 
 #include <filesystem>
 #include <memory>
@@ -60,6 +60,7 @@ SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, 
SegmentFileCollecti
 
 SegmentFlusher::~SegmentFlusher() = default;
 
+// NOLINTNEXTLINE(readability-function-cognitive-complexity)
 Status SegmentFlusher::flush_single_block(const vectorized::Block* block, 
int32_t segment_id,
                                           int64_t* flush_size) {
     if (block->rows() == 0) {
@@ -103,10 +104,12 @@ Status 
SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block)
         return Status::OK();
     }
 
-    vectorized::ParseConfig config;
-    config.enable_flatten_nested = 
_context.tablet_schema->variant_flatten_nested();
+    vectorized::ParseConfig parse_config;
+    // English comment: enable_flatten_nested controls whether to flatten 
nested array<object> paths
+    // NestedGroup expansion is handled at storage layer, not at parse time
+    parse_config.enable_flatten_nested = 
_context.tablet_schema->variant_flatten_nested();
     RETURN_IF_ERROR(
-            vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos, config));
+            vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos, parse_config));
     return Status::OK();
 }
 
@@ -115,19 +118,19 @@ Status SegmentFlusher::close() {
 }
 
 Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& 
segment_writer,
-                                 const vectorized::Block* block, size_t 
row_offset,
-                                 size_t row_num) {
-    RETURN_IF_ERROR(segment_writer->append_block(block, row_offset, row_num));
-    _num_rows_written += row_num;
+                                 const vectorized::Block* block, size_t 
row_pos,
+                                 size_t num_rows) {
+    RETURN_IF_ERROR(segment_writer->append_block(block, row_pos, num_rows));
+    _num_rows_written += num_rows;
     return Status::OK();
 }
 
 Status 
SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& 
segment_writer,
-                                 const vectorized::Block* block, size_t 
row_offset,
-                                 size_t row_num) {
-    RETURN_IF_ERROR(segment_writer->batch_block(block, row_offset, row_num));
+                                 const vectorized::Block* block, size_t 
row_pos,
+                                 size_t num_rows) {
+    RETURN_IF_ERROR(segment_writer->batch_block(block, row_pos, num_rows));
     RETURN_IF_ERROR(segment_writer->write_batch());
-    _num_rows_written += row_num;
+    _num_rows_written += num_rows;
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp 
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
index b64c63b3ffe..3aea9ec79d8 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
@@ -20,7 +20,6 @@
 #include <gen_cpp/segment_v2.pb.h>
 
 #include <memory>
-#include <set>
 #include <utility>
 
 #include "common/config.h"
@@ -29,20 +28,18 @@
 #include "olap/rowset/segment_v2/column_meta_accessor.h"
 #include "olap/rowset/segment_v2/column_reader.h"
 #include "olap/rowset/segment_v2/column_reader_cache.h"
-#include "olap/rowset/segment_v2/indexed_column_reader.h"
-#include "olap/rowset/segment_v2/page_handle.h"
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/rowset/segment_v2/variant/hierarchical_data_iterator.h"
 #include "olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h"
 #include "olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h"
 #include "olap/tablet_schema.h"
-#include "util/slice.h"
 #include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_string.h"
 #include "vec/columns/column_variant.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/schema_util.h"
+#include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/json/path_in_data.h"
 
@@ -50,6 +47,216 @@ namespace doris::segment_v2 {
 
 #include "common/compile_check_begin.h"
 
+namespace {
+
+// English comment: read offsets in range [start, start+count) and also return 
previous offset.
+Status _read_offsets_with_prev(ColumnIterator* offsets_iter, ordinal_t start, 
size_t count,
+                               uint64_t* prev, std::vector<uint64_t>* out) {
+    *prev = 0;
+    out->clear();
+    if (count == 0) {
+        return Status::OK();
+    }
+    if (start > 0) {
+        RETURN_IF_ERROR(offsets_iter->seek_to_ordinal(start - 1));
+        vectorized::MutableColumnPtr prev_col = 
vectorized::ColumnOffset64::create();
+        size_t one = 1;
+        bool has_null = false;
+        RETURN_IF_ERROR(offsets_iter->next_batch(&one, prev_col, &has_null));
+        auto* prev_data = 
assert_cast<vectorized::ColumnOffset64*>(prev_col.get());
+        if (!prev_data->get_data().empty()) {
+            *prev = prev_data->get_data()[0];
+        }
+    }
+    RETURN_IF_ERROR(offsets_iter->seek_to_ordinal(start));
+    vectorized::MutableColumnPtr off_col = 
vectorized::ColumnOffset64::create();
+    bool has_null = false;
+    RETURN_IF_ERROR(offsets_iter->next_batch(&count, off_col, &has_null));
+    auto* off_data = assert_cast<vectorized::ColumnOffset64*>(off_col.get());
+    out->assign(off_data->get_data().begin(), off_data->get_data().end());
+    return Status::OK();
+}
+
+// Iterator for reading the whole NestedGroup as ColumnVariant::NESTED_TYPE 
(Nullable(Array(Nullable(Variant)))).
+class NestedGroupWholeIterator : public ColumnIterator {
+public:
+    explicit NestedGroupWholeIterator(const NestedGroupReader* group_reader)
+            : _group_reader(group_reader) {}
+
+    Status init(const ColumnIteratorOptions& opts) override {
+        DCHECK(_group_reader && _group_reader->is_valid());
+        _iter_opts = opts;
+
+        // Build iterators for this group recursively.
+        RETURN_IF_ERROR(_build_group_state(_root_state, _group_reader));
+        return Status::OK();
+    }
+
+    Status seek_to_ordinal(ordinal_t ord_idx) override {
+        _current_ordinal = ord_idx;
+        RETURN_IF_ERROR(_seek_group_state(_root_state, ord_idx));
+        return Status::OK();
+    }
+
+    Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override {
+        RETURN_IF_ERROR(_append_group_as_nested_type(_root_state, 
_current_ordinal, *n, dst));
+        _current_ordinal += *n;
+        *has_null = false;
+        return Status::OK();
+    }
+
+    Status read_by_rowids(const rowid_t* rowids, const size_t count,
+                          vectorized::MutableColumnPtr& dst) override {
+        bool has_null = false;
+        for (size_t i = 0; i < count; ++i) {
+            RETURN_IF_ERROR(seek_to_ordinal(rowids[i]));
+            size_t one = 1;
+            RETURN_IF_ERROR(next_batch(&one, dst, &has_null));
+        }
+        return Status::OK();
+    }
+
+    ordinal_t get_current_ordinal() const override { return _current_ordinal; }
+
+private:
+    struct GroupState {
+        const NestedGroupReader* reader = nullptr;
+        ColumnIteratorUPtr offsets_iter;
+        std::unordered_map<std::string, ColumnIteratorUPtr> child_iters;
+        std::unordered_map<std::string, std::unique_ptr<GroupState>> 
nested_groups;
+    };
+
+    Status _build_group_state(GroupState& state, const NestedGroupReader* 
reader) {
+        state.reader = reader;
+        
RETURN_IF_ERROR(reader->offsets_reader->new_iterator(&state.offsets_iter, 
nullptr));
+        RETURN_IF_ERROR(state.offsets_iter->init(_iter_opts));
+        for (const auto& [name, cr] : reader->child_readers) {
+            ColumnIteratorUPtr it;
+            RETURN_IF_ERROR(cr->new_iterator(&it, nullptr));
+            RETURN_IF_ERROR(it->init(_iter_opts));
+            state.child_iters.emplace(name, std::move(it));
+        }
+        for (const auto& [name, nested] : reader->nested_group_readers) {
+            auto st = std::make_unique<GroupState>();
+            RETURN_IF_ERROR(_build_group_state(*st, nested.get()));
+            state.nested_groups.emplace(name, std::move(st));
+        }
+        return Status::OK();
+    }
+
+    Status _seek_group_state(GroupState& state, ordinal_t row_ord) {
+        RETURN_IF_ERROR(state.offsets_iter->seek_to_ordinal(row_ord));
+        uint64_t start_off = 0;
+        if (row_ord > 0) {
+            std::vector<uint64_t> prev;
+            RETURN_IF_ERROR(_read_offsets_with_prev(state.offsets_iter.get(), 
row_ord, 1, &start_off,
+                                                   &prev));
+            if (!prev.empty()) {
+                start_off = prev[0];
+            }
+        }
+        for (auto& [_, it] : state.child_iters) {
+            RETURN_IF_ERROR(it->seek_to_ordinal(start_off));
+        }
+        for (auto& [_, nested] : state.nested_groups) {
+            // nested groups are indexed by parent element ordinal (flat index)
+            RETURN_IF_ERROR(_seek_group_state(*nested, start_off));
+        }
+        return Status::OK();
+    }
+
+    // Build an array<object> column (ColumnVariant::NESTED_TYPE) for rows 
[row_ord, row_ord+row_cnt)
+    // and append it into dst (expected Nullable(Array(Nullable(Variant)))).
+    Status _append_group_as_nested_type(GroupState& state, ordinal_t row_ord, 
size_t row_cnt,
+                                        vectorized::MutableColumnPtr& dst) {
+        // dst is expected to be Nullable(Array(Nullable(Variant)))
+        auto* dst_nullable = 
assert_cast<vectorized::ColumnNullable*>(dst.get());
+        auto& dst_array =
+                
assert_cast<vectorized::ColumnArray&>(dst_nullable->get_nested_column());
+        auto& dst_offsets = dst_array.get_offsets();
+
+        uint64_t start_off = 0;
+        std::vector<uint64_t> offsets;
+        RETURN_IF_ERROR(_read_offsets_with_prev(state.offsets_iter.get(), 
row_ord, row_cnt, &start_off,
+                                               &offsets));
+        uint64_t end_off = offsets.empty() ? start_off : offsets.back();
+        auto elem_count = static_cast<size_t>(end_off - start_off);
+
+        // Seek all child iterators to start offset (safe for both 
sequential/random usage)
+        for (auto& [_, it] : state.child_iters) {
+            RETURN_IF_ERROR(it->seek_to_ordinal(start_off));
+        }
+
+        // Read scalar child columns (flat) for this batch
+        std::unordered_map<std::string, vectorized::MutableColumnPtr> 
child_cols;
+        child_cols.reserve(state.child_iters.size());
+        for (auto& [name, it] : state.child_iters) {
+            auto type = 
state.reader->child_readers.at(name)->get_vec_data_type();
+            auto col = type->create_column();
+            if (elem_count > 0) {
+                size_t to_read = elem_count;
+                bool child_has_null = false;
+                RETURN_IF_ERROR(it->next_batch(&to_read, col, 
&child_has_null));
+            }
+            child_cols.emplace(name, std::move(col));
+        }
+
+        // Build element objects: ColumnVariant with paths = field names
+        auto elem_obj = vectorized::ColumnVariant::create(0, elem_count);
+        auto* elem_obj_ptr = 
assert_cast<vectorized::ColumnVariant*>(elem_obj.get());
+        for (auto& [name, col] : child_cols) {
+            vectorized::PathInData p(name);
+            bool ok = elem_obj_ptr->add_sub_column(
+                    p, col->assume_mutable(),
+                    state.reader->child_readers.at(name)->get_vec_data_type());
+            if (!ok) {
+                return Status::InternalError("Duplicated NestedGroup child 
field {}", name);
+            }
+        }
+
+        // Reconstruct nested array fields (multi-level NestedGroup)
+        for (auto& [name, nested_state] : state.nested_groups) {
+            // Create a nested type column for this field, sized by elem_count 
(rows = parent elements)
+            vectorized::MutableColumnPtr nested_col =
+                    vectorized::ColumnVariant::NESTED_TYPE->create_column();
+            if (elem_count > 0) {
+                // Fill nested_col by reading from nested group rows 
[start_off, start_off+elem_count)
+                RETURN_IF_ERROR(_append_group_as_nested_type(*nested_state, 
start_off, elem_count,
+                                                            nested_col));
+            }
+            vectorized::PathInData p(name);
+            bool ok = elem_obj_ptr->add_sub_column(p, std::move(nested_col),
+                                                  
vectorized::ColumnVariant::NESTED_TYPE);
+            if (!ok) {
+                return Status::InternalError("Duplicated NestedGroup nested 
field {}", name);
+            }
+        }
+
+        // Ensure element objects are nullable to match NESTED_TYPE inner 
nullable
+        vectorized::ColumnPtr elem_obj_nullable = 
vectorized::make_nullable(elem_obj->get_ptr());
+
+        // Append array offsets
+        size_t prev = dst_offsets.empty() ? 0 : dst_offsets.back();
+        for (size_t i = 0; i < offsets.size(); ++i) {
+            uint64_t sz = (i == 0) ? (offsets[i] - start_off) : (offsets[i] - 
offsets[i - 1]);
+            dst_offsets.push_back(prev + static_cast<size_t>(sz));
+            prev = dst_offsets.back();
+        }
+
+        // Append array data
+        dst_array.get_data().insert_range_from(*elem_obj_nullable, 0, 
elem_obj_nullable->size());
+        dst_nullable->get_null_map_column().insert_many_vals(0, 
offsets.size());
+        return Status::OK();
+    }
+
+    const NestedGroupReader* _group_reader;
+    ColumnIteratorOptions _iter_opts;
+    GroupState _root_state;
+    ordinal_t _current_ordinal = 0;
+};
+
+} // namespace
+
 const SubcolumnColumnMetaInfo::Node* 
VariantColumnReader::get_subcolumn_meta_by_path(
         const vectorized::PathInData& relative_path) const {
     const auto* node = _subcolumns_meta_info->find_leaf(relative_path);
@@ -158,7 +365,7 @@ private:
                 }
             }
 
-            std::sort(all_paths.begin(), all_paths.end());
+            std::ranges::sort(all_paths);
             for (const auto& [path, bucket, offset] : all_paths) {
                 dst_sparse_data_paths.insert_data(path.data(), path.size());
                 
dst_sparse_data_values.insert_from(*src_sparse_data_values_buckets[bucket], 
offset);
@@ -178,7 +385,9 @@ Status 
UnifiedSparseColumnReader::new_sparse_iterator(ColumnIteratorUPtr* iter)
         std::vector<std::unique_ptr<ColumnIterator>> iters;
         iters.reserve(_buckets.size());
         for (const auto& br : _buckets) {
-            if (!br) continue;
+            if (!br) {
+                continue;
+            }
             ColumnIteratorUPtr it;
             RETURN_IF_ERROR(br->new_iterator(&it, nullptr));
             iters.emplace_back(std::move(it));
@@ -195,8 +404,8 @@ Status 
UnifiedSparseColumnReader::new_sparse_iterator(ColumnIteratorUPtr* iter)
 std::pair<std::shared_ptr<ColumnReader>, std::string>
 UnifiedSparseColumnReader::select_reader_and_cache_key(const std::string& 
relative_path) const {
     if (has_buckets()) {
-        uint32_t N = static_cast<uint32_t>(_buckets.size());
-        uint32_t bucket_index = 
vectorized::schema_util::variant_sparse_shard_of(
+        auto N = static_cast<uint32_t>(_buckets.size());
+        auto bucket_index = vectorized::schema_util::variant_sparse_shard_of(
                 StringRef {relative_path.data(), relative_path.size()}, N);
         DCHECK(bucket_index < _buckets.size());
         std::string key = std::string(SPARSE_COLUMN_PATH) + ".b" + 
std::to_string(bucket_index);
@@ -210,8 +419,7 @@ bool VariantColumnReader::exist_in_sparse_column(
     // Check if path exist in sparse column
     bool existed_in_sparse_column =
             !_statistics->sparse_column_non_null_size.empty() &&
-            
_statistics->sparse_column_non_null_size.find(relative_path.get_path()) !=
-                    _statistics->sparse_column_non_null_size.end();
+            
_statistics->sparse_column_non_null_size.contains(relative_path.get_path());
     const std::string& prefix = relative_path.get_path() + ".";
     bool prefix_existed_in_sparse_column =
             !_statistics->sparse_column_non_null_size.empty() &&
@@ -327,9 +535,9 @@ Status 
VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr* iter
         // If bucketized sparse column is requested (per-bucket sparse output 
column),
         // only collect subcolumns that belong to this bucket to avoid extra 
IO.
         if (bucket_index.has_value() && _sparse_reader.has_buckets()) {
-            uint32_t N = static_cast<uint32_t>(_sparse_reader.num_buckets());
+            auto N = static_cast<uint32_t>(_sparse_reader.num_buckets());
             if (N > 1) {
-                uint32_t b = vectorized::schema_util::variant_sparse_shard_of(
+                auto b = vectorized::schema_util::variant_sparse_shard_of(
                         StringRef {path.data(), path.size()}, N);
                 if (b != bucket_index.value()) {
                     continue; // prune subcolumns of other buckets early
@@ -416,11 +624,12 @@ Result<SparseColumnCacheSPtr> 
VariantColumnReader::_get_shared_column_cache(
     return sparse_column_cache_ptr->at(path);
 }
 
+// 
NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size)
 Status VariantColumnReader::_new_iterator_with_flat_leaves(
         ColumnIteratorUPtr* iterator, const TabletColumn& target_col,
         const StorageReadOptions* opts, bool exceeded_sparse_column_limit,
         bool existed_in_sparse_column, ColumnReaderCache* column_reader_cache,
-        PathToSparseColumnCache* sparse_column_cache_ptr) {
+        PathToSparseColumnCache* sparse_column_cache_ptr) { // 
NOLINT(readability-function-cognitive-complexity,readability-function-size,readability-function-cognitive-complexity)
     // make sure external meta is loaded otherwise can't find any meta data 
for extracted columns
     RETURN_IF_ERROR(load_external_meta_once());
 
@@ -444,8 +653,7 @@ Status VariantColumnReader::_new_iterator_with_flat_leaves(
             return Status::OK();
         }
         // Case 2: bucketized sparse column path: __DORIS_VARIANT_SPARSE__.b{i}
-        if (rel.rfind(std::string(SPARSE_COLUMN_PATH) + ".b", 0) == 0 &&
-            _sparse_reader.has_buckets()) {
+        if (rel.starts_with(std::string(SPARSE_COLUMN_PATH) + ".b") && 
_sparse_reader.has_buckets()) {
             // parse bucket index
             uint32_t bucket_index = static_cast<uint32_t>(
                     atoi(rel.substr(std::string(SPARSE_COLUMN_PATH).size() + 
2).c_str()));
@@ -547,11 +755,12 @@ Status 
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
     return Status::NotSupported("Not implemented");
 }
 
+// 
NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size)
 Status VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
                                          const TabletColumn* target_col,
                                          const StorageReadOptions* opt,
                                          ColumnReaderCache* 
column_reader_cache,
-                                         PathToSparseColumnCache* 
sparse_column_cache_ptr) {
+                                         PathToSparseColumnCache* 
sparse_column_cache_ptr) { // 
NOLINT(readability-function-cognitive-complexity,readability-function-size,readability-function-cognitive-complexity)
     int32_t col_uid =
             target_col->unique_id() >= 0 ? target_col->unique_id() : 
target_col->parent_unique_id();
     // root column use unique id, leaf column use parent_unique_id
@@ -568,6 +777,48 @@ Status 
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
         node = _subcolumns_meta_info->find_exact(relative_path);
     }
 
+    // Whole NestedGroup access: reading $.nested should return the 
array<object> itself.
+    if (!relative_path.empty()) {
+        const NestedGroupReader* gr = 
get_nested_group_reader(relative_path.get_path());
+        if (gr && gr->is_valid()) {
+            *iterator = std::make_unique<NestedGroupWholeIterator>(gr);
+            return Status::OK();
+        }
+    }
+
+    // Check if path belongs to a NestedGroup
+    // NestedGroup stores array<object> with shared offsets for element-level 
associations
+    if (is_nested_group_path(relative_path)) {
+        auto [array_path, child_relative_path] =
+                vectorized::ColumnVariant::split_array_path(relative_path);
+        const NestedGroupReader* group_reader = 
get_nested_group_reader(array_path.get_path());
+        if (group_reader && group_reader->is_valid()) {
+            // Find the child reader for this relative path
+            auto child_it = 
group_reader->child_readers.find(child_relative_path.get_path());
+            if (child_it != group_reader->child_readers.end()) {
+                // Create iterators for offsets and child data
+                ColumnIteratorUPtr offsets_iter;
+                
RETURN_IF_ERROR(group_reader->offsets_reader->new_iterator(&offsets_iter, 
nullptr));
+
+                ColumnIteratorUPtr child_iter;
+                RETURN_IF_ERROR(child_it->second->new_iterator(&child_iter, 
nullptr));
+
+                // Determine result type (array of child type)
+                auto child_type = child_it->second->get_vec_data_type();
+                auto result_type = 
std::make_shared<vectorized::DataTypeArray>(child_type);
+
+                *iterator = std::make_unique<NestedGroupIterator>(
+                        std::move(offsets_iter), std::move(child_iter), 
result_type);
+
+                if (opt && opt->stats) {
+                    opt->stats->variant_subtree_leaf_iter_count++;
+                }
+                return Status::OK();
+            }
+        }
+        // If NestedGroup reader not found or child not found, fall through to 
other logic
+    }
+
     // Check if path exist in sparse column
     bool existed_in_sparse_column =
             !_statistics->sparse_column_non_null_size.empty() &&
@@ -673,9 +924,10 @@ Status 
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
     return Status::OK();
 }
 
+// 
NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size)
 Status VariantColumnReader::init(const ColumnReaderOptions& opts, 
ColumnMetaAccessor* accessor,
                                  const std::shared_ptr<SegmentFooterPB>& 
footer, int32_t column_uid,
-                                 uint64_t num_rows, io::FileReaderSPtr 
file_reader) {
+                                 uint64_t num_rows, io::FileReaderSPtr 
file_reader) { // 
NOLINT(readability-function-cognitive-complexity,readability-function-size,readability-function-cognitive-complexity)
     // init sub columns
     _subcolumns_meta_info = std::make_unique<SubcolumnColumnMetaInfo>();
     _statistics = std::make_unique<VariantStatistics>();
@@ -844,6 +1096,11 @@ Status VariantColumnReader::init(const 
ColumnReaderOptions& opts, ColumnMetaAcce
     // try build external meta readers (optional)
     _ext_meta_reader = std::make_unique<VariantExternalMetaReader>();
     RETURN_IF_ERROR(_ext_meta_reader->init_from_footer(footer, file_reader, 
_root_unique_id));
+
+    // Initialize NestedGroup readers based on segment footer (not config flag)
+    // If segment contains NestedGroup columns (__ng. prefix), initialize 
readers
+    RETURN_IF_ERROR(_init_nested_group_readers(opts, footer, file_reader));
+
     return Status::OK();
 }
 Status VariantColumnReader::create_reader_from_external_meta(const 
std::string& path,
@@ -953,6 +1210,14 @@ vectorized::DataTypePtr 
VariantColumnReader::infer_data_type_for_path(
         const TabletColumn& column, const vectorized::PathInData& 
relative_path,
         bool read_flat_leaves, ColumnReaderCache* cache, int32_t col_uid) 
const {
     // english only in comments
+    // If this is exactly a NestedGroup array path, expose it as 
ColumnVariant::NESTED_TYPE so
+    // callers can read $.nested without relying on root JSONB.
+    if (!relative_path.empty()) {
+        const NestedGroupReader* gr = 
get_nested_group_reader(relative_path.get_path());
+        if (gr && gr->is_valid()) {
+            return vectorized::ColumnVariant::NESTED_TYPE;
+        }
+    }
     // Locate the subcolumn node by path.
     const auto* node = get_subcolumn_meta_by_path(relative_path);
 
@@ -1061,7 +1326,7 @@ Status VariantRootColumnIterator::next_batch(size_t* n, 
vectorized::MutableColum
                               
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
                     : assert_cast<vectorized::ColumnVariant&>(*dst);
 
-    auto most_common_type = obj.get_most_common_type();
+    auto most_common_type = obj.get_most_common_type(); // 
NOLINT(readability-static-accessed-through-instance)
     auto root_column = most_common_type->create_column();
     RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));
 
@@ -1077,7 +1342,7 @@ Status VariantRootColumnIterator::read_by_rowids(const 
rowid_t* rowids, const si
                               
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
                     : assert_cast<vectorized::ColumnVariant&>(*dst);
 
-    auto most_common_type = obj.get_most_common_type();
+    auto most_common_type = obj.get_most_common_type(); // 
NOLINT(readability-static-accessed-through-instance)
     auto root_column = most_common_type->create_column();
     RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, root_column));
 
@@ -1136,6 +1401,323 @@ Status 
DefaultNestedColumnIterator::read_by_rowids(const rowid_t* rowids, const
     return Status::OK();
 }
 
+// NestedGroup support implementations
+
+// Helper to find or create nested group reader at the right level
+static NestedGroupReader* find_or_create_nested_group(
+        NestedGroupReaders& readers,
+        const std::string& full_path,
+        size_t depth) {
+    // Parse the path to find the hierarchy
+    // Format: path1.__ng.path2.__ng.path3...
+    std::string ng_marker = ".__ng.";
+
+    // Split by .__ng. to get hierarchy
+    std::vector<std::string> path_parts;
+    size_t start = 0;
+    size_t pos;
+    while ((pos = full_path.find(ng_marker, start)) != std::string::npos) {
+        path_parts.push_back(full_path.substr(start, pos - start));
+        start = pos + ng_marker.length();
+    }
+    path_parts.push_back(full_path.substr(start));
+
+    if (path_parts.empty()) {
+        return nullptr;
+    }
+
+    // Navigate/create the hierarchy
+    NestedGroupReaders* current_map = &readers;
+    NestedGroupReader* current_reader = nullptr;
+
+    for (size_t i = 0; i < path_parts.size(); ++i) {
+        const std::string& part = path_parts[i];
+        auto it = current_map->find(part);
+
+        if (it == current_map->end()) {
+            // Create new reader
+            auto new_reader = std::make_unique<NestedGroupReader>();
+            new_reader->array_path = part;
+            new_reader->depth = i + 1;
+            current_reader = new_reader.get();
+            (*current_map)[part] = std::move(new_reader);
+        } else {
+            current_reader = it->second.get();
+        }
+
+        if (i + 1 < path_parts.size()) {
+            current_map = &current_reader->nested_group_readers;
+        }
+    }
+
+    return current_reader;
+}
+
+// 
NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size)
+Status VariantColumnReader::_init_nested_group_readers(
+        const ColumnReaderOptions& opts, const 
std::shared_ptr<SegmentFooterPB>& footer,
+        const io::FileReaderSPtr& file_reader) { // 
NOLINT(readability-function-cognitive-complexity,readability-function-size,readability-function-cognitive-complexity)
+    // Scan footer for NestedGroup columns (columns with __ng. prefix)
+    // Pattern: <variant_name>.__ng.<array_path>.__offsets for offsets
+    // Pattern: <variant_name>.__ng.<array_path>.<field_path> for children
+    // Multi-level: <variant>.__ng.a.__ng.b.__offsets for nested groups
+
+    std::string ng_prefix = ".__ng.";
+    std::string offsets_suffix = ".__offsets";
+
+    // Collect all columns info first
+    struct ColumnInfo {
+        int ordinal;
+        std::string full_ng_path;  // The full nested group path (may include 
.__ng.)
+        std::string relative_path; // Relative path for children
+        bool is_offsets;
+        size_t depth;
+    };
+    std::vector<ColumnInfo> column_infos;
+
+    for (int i = 0; i < footer->columns_size(); ++i) {
+        const auto& col = footer->columns(i);
+        if (!col.has_column_path_info()) {
+            continue;
+        }
+
+        const auto& path_info = col.column_path_info();
+        if (!path_info.has_nested_group_parent_path()) {
+            continue;
+        }
+
+        ColumnInfo info;
+        info.ordinal = i;
+        info.full_ng_path = path_info.nested_group_parent_path();
+        info.is_offsets = path_info.has_is_nested_group_offsets() && 
path_info.is_nested_group_offsets();
+        info.depth = path_info.has_nested_group_depth() ? 
path_info.nested_group_depth() : 1;
+
+        if (!info.is_offsets) {
+            // Extract relative path
+            std::string path_str = path_info.path();
+            size_t last_ng_pos = path_str.rfind(ng_prefix);
+            if (last_ng_pos != std::string::npos) {
+                size_t after_ng = last_ng_pos + ng_prefix.length();
+                // Find the end of the array path part
+                size_t dot_pos = path_str.find('.', after_ng);
+                while (dot_pos != std::string::npos) {
+                    std::string potential_child = path_str.substr(dot_pos + 1);
+                    // Check if this is still part of the nested group path
+                    if (!potential_child.starts_with("__ng.")) {
+                        info.relative_path = potential_child;
+                        break;
+                    }
+                    dot_pos = path_str.find('.', dot_pos + 1);
+                }
+            }
+        }
+
+        column_infos.push_back(std::move(info));
+    }
+
+    // Create readers for each NestedGroup
+    for (const auto& info : column_infos) {
+        NestedGroupReader* group_reader = find_or_create_nested_group(
+                _nested_group_readers, info.full_ng_path, info.depth);
+
+        if (!group_reader) {
+            continue;
+        }
+
+        const auto& col_meta = footer->columns(info.ordinal);
+
+        if (info.is_offsets) {
+            // Create offsets reader
+            RETURN_IF_ERROR(ColumnReader::create(opts, col_meta, _num_rows, 
file_reader,
+                                                 
&group_reader->offsets_reader));
+        } else if (!info.relative_path.empty()) {
+            // Create child reader
+            std::shared_ptr<ColumnReader> child_reader;
+            uint64_t child_num_rows = col_meta.has_num_rows() ? 
col_meta.num_rows() : _num_rows;
+            RETURN_IF_ERROR(ColumnReader::create(opts, col_meta, 
child_num_rows, file_reader,
+                                                 &child_reader));
+            group_reader->child_readers[info.relative_path] = 
std::move(child_reader);
+        }
+    }
+
+    return Status::OK();
+}
+
+// Helper to recursively find a nested group reader
+static const NestedGroupReader* find_nested_group_reader_recursive(
+        const NestedGroupReaders& readers,
+        const std::string& target_path) {
+    for (const auto& [path, reader] : readers) {
+        if (path == target_path) {
+            return reader.get();
+        }
+        // Check nested groups
+        const NestedGroupReader* nested_result =
+                
find_nested_group_reader_recursive(reader->nested_group_readers, target_path);
+        if (nested_result) {
+            return nested_result;
+        }
+    }
+    return nullptr;
+}
+
+const NestedGroupReader* VariantColumnReader::get_nested_group_reader(
+        const std::string& array_path) const {
+    // First try direct lookup
+    auto it = _nested_group_readers.find(array_path);
+    if (it != _nested_group_readers.end()) {
+        return it->second.get();
+    }
+    // Try recursive search for nested groups
+    return find_nested_group_reader_recursive(_nested_group_readers, 
array_path);
+}
+
+bool VariantColumnReader::is_nested_group_path(const vectorized::PathInData& 
path) const {
+    auto [array_path, relative_path] = 
vectorized::ColumnVariant::split_array_path(path);
+    if (array_path.empty()) {
+        return false;
+    }
+    return get_nested_group_reader(array_path.get_path()) != nullptr;
+}
+
+// NestedGroupIterator implementations
+
+Status NestedGroupIterator::init(const ColumnIteratorOptions& opts) {
+    RETURN_IF_ERROR(_offsets_iter->init(opts));
+    RETURN_IF_ERROR(_child_iter->init(opts));
+    return Status::OK();
+}
+
+Status NestedGroupIterator::seek_to_ordinal(ordinal_t ord_idx) {
+    _current_ordinal = ord_idx;
+    RETURN_IF_ERROR(_offsets_iter->seek_to_ordinal(ord_idx));
+    // For child iterator, we need to seek to the flat position
+    // corresponding to this row ordinal
+    // This requires reading the offset at ord_idx-1 (or 0 if ord_idx == 0)
+    if (ord_idx > 0) {
+        std::vector<uint64_t> prev_offset;
+        RETURN_IF_ERROR(_read_offsets(ord_idx - 1, 1, &prev_offset));
+        if (!prev_offset.empty()) {
+            RETURN_IF_ERROR(_child_iter->seek_to_ordinal(prev_offset[0]));
+        }
+    } else {
+        RETURN_IF_ERROR(_child_iter->seek_to_ordinal(0));
+    }
+    return Status::OK();
+}
+
+Status NestedGroupIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
+                                       bool* has_null) {
+    // Read offsets for these rows
+    std::vector<uint64_t> offsets;
+    RETURN_IF_ERROR(_read_offsets(_current_ordinal, *n, &offsets));
+
+    // Calculate how many child elements to read
+    // For start_offset, we need the offset of the previous row (or 0 if this 
is the first row)
+    size_t start_offset = 0;
+    if (_current_ordinal > 0) {
+        // Read the previous row's offset
+        std::vector<uint64_t> prev_offsets;
+        RETURN_IF_ERROR(_read_offsets(_current_ordinal - 1, 1, &prev_offsets));
+        if (!prev_offsets.empty()) {
+            start_offset = prev_offsets[0];
+        }
+    }
+    size_t end_offset = offsets.empty() ? start_offset : offsets.back();
+    size_t num_elements = end_offset - start_offset;
+
+    // Read child data
+    auto child_col = _result_type->create_column();
+    if (num_elements > 0) {
+        bool child_has_null = false;
+        RETURN_IF_ERROR(_child_iter->next_batch(&num_elements, child_col, 
&child_has_null));
+    }
+
+    // Build array column from offsets and child data
+    auto* dst_array = assert_cast<vectorized::ColumnArray*>(dst.get());
+
+    // Convert offsets to array offsets (relative to start)
+    auto& dst_offsets = dst_array->get_offsets();
+    size_t prev_offset = dst_offsets.empty() ? 0 : dst_offsets.back();
+    for (size_t i = 0; i < offsets.size(); ++i) {
+        size_t array_size = (i == 0) ? (offsets[i] - start_offset)
+                                     : (offsets[i] - offsets[i - 1]);
+        dst_offsets.push_back(prev_offset + array_size);
+        prev_offset = dst_offsets.back();
+    }
+
+    // Insert child data
+    dst_array->get_data().insert_range_from(*child_col, 0, child_col->size());
+
+    _current_ordinal += *n;
+    *has_null = false;
+    return Status::OK();
+}
+
+Status NestedGroupIterator::read_by_rowids(const rowid_t* rowids, const size_t 
count,
+                                           vectorized::MutableColumnPtr& dst) {
+    // For random access, we need to read offsets for each row and then
+    // read the corresponding child elements
+    // This is more complex and less efficient than sequential access
+
+    auto* dst_array = assert_cast<vectorized::ColumnArray*>(dst.get());
+    auto& dst_offsets = dst_array->get_offsets();
+
+    for (size_t i = 0; i < count; ++i) {
+        rowid_t rowid = rowids[i];
+
+        // Read offset for this row and previous row
+        vectorized::MutableColumnPtr offset_col = 
vectorized::ColumnOffset64::create();
+        size_t start_offset = 0;
+        if (rowid > 0) {
+            RETURN_IF_ERROR(_offsets_iter->seek_to_ordinal(rowid - 1));
+            size_t n = 2;
+            bool has_null = false;
+            RETURN_IF_ERROR(_offsets_iter->next_batch(&n, offset_col, 
&has_null));
+            auto* offset_data = 
assert_cast<vectorized::ColumnOffset64*>(offset_col.get());
+            start_offset = offset_data->get_data()[0];
+        } else {
+            RETURN_IF_ERROR(_offsets_iter->seek_to_ordinal(0));
+            size_t n = 1;
+            bool has_null = false;
+            RETURN_IF_ERROR(_offsets_iter->next_batch(&n, offset_col, 
&has_null));
+        }
+
+        auto* offset_data = 
assert_cast<vectorized::ColumnOffset64*>(offset_col.get());
+        size_t end_offset = offset_data->get_data().back();
+        size_t num_elements = end_offset - start_offset;
+
+        // Read child elements for this row
+        auto child_col = _result_type->create_column();
+        if (num_elements > 0) {
+            RETURN_IF_ERROR(_child_iter->seek_to_ordinal(start_offset));
+            bool child_has_null = false;
+            RETURN_IF_ERROR(_child_iter->next_batch(&num_elements, child_col, 
&child_has_null));
+        }
+
+        // Add to destination array
+        size_t prev_offset = dst_offsets.empty() ? 0 : dst_offsets.back();
+        dst_offsets.push_back(prev_offset + num_elements);
+        dst_array->get_data().insert_range_from(*child_col, 0, 
child_col->size());
+    }
+
+    return Status::OK();
+}
+
+Status NestedGroupIterator::_read_offsets(ordinal_t start_ordinal, size_t 
num_rows,
+                                          std::vector<uint64_t>* offsets_out) {
+    RETURN_IF_ERROR(_offsets_iter->seek_to_ordinal(start_ordinal));
+
+    vectorized::MutableColumnPtr offset_col = 
vectorized::ColumnOffset64::create();
+    bool has_null = false;
+    RETURN_IF_ERROR(_offsets_iter->next_batch(&num_rows, offset_col, 
&has_null));
+
+    auto* offset_data = 
assert_cast<vectorized::ColumnOffset64*>(offset_col.get());
+    offsets_out->assign(offset_data->get_data().begin(), 
offset_data->get_data().end());
+
+    return Status::OK();
+}
+
 #include "common/compile_check_end.h"
 
 } // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h 
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
index 5c21e167a0f..87bdcd3ac2d 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
@@ -200,6 +200,24 @@ using SparseColumnCacheSPtr = 
std::shared_ptr<SparseColumnCache>;
 using PathToSparseColumnCache = std::unordered_map<std::string, 
SparseColumnCacheSPtr>;
 using PathToSparseColumnCacheUPtr = std::unique_ptr<PathToSparseColumnCache>;
 
+// Forward declaration
+struct NestedGroupReader;
+
+// Map from array path to NestedGroupReader
+using NestedGroupReaders = std::unordered_map<std::string, 
std::unique_ptr<NestedGroupReader>>;
+
+// Holds readers for a single NestedGroup (offsets + child columns + nested 
groups)
+struct NestedGroupReader {
+    std::string array_path;
+    size_t depth = 1;  // Nesting depth (1 = first level)
+    std::shared_ptr<ColumnReader> offsets_reader;
+    std::unordered_map<std::string, std::shared_ptr<ColumnReader>> 
child_readers;
+    // Nested groups within this group (for multi-level nesting)
+    NestedGroupReaders nested_group_readers;
+
+    bool is_valid() const { return offsets_reader != nullptr; }
+};
+
 class VariantColumnReader : public ColumnReader {
 public:
     VariantColumnReader() = default;
@@ -286,7 +304,21 @@ public:
     // 3) Externalized metas via `_ext_meta_reader`
     bool has_prefix_path(const vectorized::PathInData& relative_path) const;
 
+    // NestedGroup support
+    // Get NestedGroup reader for a given array path
+    const NestedGroupReader* get_nested_group_reader(const std::string& 
array_path) const;
+
+    // Check if a path belongs to a NestedGroup
+    bool is_nested_group_path(const vectorized::PathInData& path) const;
+
+    // Get all NestedGroup readers
+    const NestedGroupReaders& get_nested_group_readers() const { return 
_nested_group_readers; }
+
 private:
+    // Initialize NestedGroup readers from footer metadata
+    Status _init_nested_group_readers(const ColumnReaderOptions& opts,
+                                      const std::shared_ptr<SegmentFooterPB>& 
footer,
+                                      const io::FileReaderSPtr& file_reader);
     // init for compaction read
     Status _new_default_iter_with_same_nested(ColumnIteratorUPtr* iterator, 
const TabletColumn& col,
                                               const StorageReadOptions* opt,
@@ -333,6 +365,9 @@ private:
     uint32_t _root_unique_id {0};
 
     // call-once guard moved into VariantExternalMetaReader
+
+    // NestedGroup readers for array<object> paths
+    NestedGroupReaders _nested_group_readers;
 };
 
 class VariantRootColumnIterator : public ColumnIterator {
@@ -416,6 +451,40 @@ private:
     ordinal_t _current_rowid = 0;
 };
 
+// Iterator for reading a child column from a NestedGroup
+// Reads flat child data and reconstructs array semantics using shared offsets
+class NestedGroupIterator : public ColumnIterator {
+public:
+    NestedGroupIterator(ColumnIteratorUPtr offsets_iter, ColumnIteratorUPtr 
child_iter,
+                        vectorized::DataTypePtr result_type)
+            : _offsets_iter(std::move(offsets_iter)),
+              _child_iter(std::move(child_iter)),
+              _result_type(std::move(result_type)) {}
+
+    ~NestedGroupIterator() override = default;
+
+    Status init(const ColumnIteratorOptions& opts) override;
+
+    Status seek_to_ordinal(ordinal_t ord_idx) override;
+
+    Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override;
+
+    Status read_by_rowids(const rowid_t* rowids, const size_t count,
+                          vectorized::MutableColumnPtr& dst) override;
+
+    ordinal_t get_current_ordinal() const override { return _current_ordinal; }
+
+private:
+    // Read offsets for the given row range
+    Status _read_offsets(ordinal_t start_ordinal, size_t num_rows,
+                         std::vector<uint64_t>* offsets_out);
+
+    ColumnIteratorUPtr _offsets_iter;
+    ColumnIteratorUPtr _child_iter;
+    vectorized::DataTypePtr _result_type;
+    ordinal_t _current_ordinal = 0;
+};
+
 #include "common/compile_check_end.h"
 
 } // namespace segment_v2
diff --git 
a/be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp 
b/be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp
index 0e350ced4ed..fe93152bd53 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp
@@ -599,6 +599,10 @@ Status VariantColumnWriterImpl::finalize() {
         // process sparse column and append to sparse writer buffer
         RETURN_IF_ERROR(
                 _process_sparse_column(ptr, olap_data_convertor.get(), 
num_rows, column_id));
+
+        // NestedGroup processing: expansion from JSONB is handled by 
NestedGroupBuilder
+        // This will be implemented in a subsequent phase
+        // TODO: Call _build_nested_groups_from_jsonb() and 
_write_nested_groups() here
     }
 
     _is_finalized = true;
@@ -847,6 +851,148 @@ Status VariantSubcolumnWriter::append_nullable(const 
uint8_t* null_map, const ui
     return Status::OK();
 }
 
+// Helper function to recursively write a NestedGroup
+static Status write_nested_group_recursive(
+        const vectorized::ColumnVariant::NestedGroup* group,
+        const std::string& path_prefix,
+        const TabletColumn* tablet_column,
+        const ColumnWriterOptions& base_opts,
+        vectorized::OlapBlockDataConvertor* converter,
+        size_t parent_num_rows,
+        int& column_id,
+        std::unordered_map<std::string, NestedGroupWriter>& writers,
+        VariantStatistics& statistics,
+        size_t depth) {
+
+    if (!group || group->is_disabled) {
+        return Status::OK();
+    }
+
+    std::string full_path = path_prefix.empty() ? group->path.get_path()
+                                                 : path_prefix + ".__ng." + 
group->path.get_path();
+    auto& group_writer = writers[full_path];
+
+    // 1. Create and write offsets column
+    std::string offsets_col_name = tablet_column->name_lower_case() + ".__ng." 
+
+                                   full_path + ".__offsets";
+
+    TabletColumn offsets_column;
+    offsets_column.set_name(offsets_col_name);
+    offsets_column.set_type(FieldType::OLAP_FIELD_TYPE_BIGINT);
+    offsets_column.set_is_nullable(false);
+    offsets_column.set_length(sizeof(uint64_t));
+    offsets_column.set_index_length(sizeof(uint64_t));
+
+    group_writer.offsets_opts = base_opts;
+    group_writer.offsets_opts.meta = base_opts.footer->add_columns();
+    _init_column_meta(group_writer.offsets_opts.meta, column_id, 
offsets_column,
+                      base_opts.compression_type);
+
+    // Mark as NestedGroup offsets in metadata
+    auto* path_info = 
group_writer.offsets_opts.meta->mutable_column_path_info();
+    path_info->set_is_nested_group_offsets(true);
+    path_info->set_nested_group_parent_path(full_path);
+    path_info->set_path(offsets_col_name);
+    path_info->set_nested_group_depth(static_cast<uint32_t>(depth));
+
+    RETURN_IF_ERROR(ColumnWriter::create(group_writer.offsets_opts, 
&offsets_column,
+                                         base_opts.file_writer, 
&group_writer.offsets_writer));
+    RETURN_IF_ERROR(group_writer.offsets_writer->init());
+
+    // Write offsets data
+    // English comment: avoid implicit conversion from MutablePtr to immutable 
ColumnPtr.
+    vectorized::ColumnPtr offsets_col =
+            static_cast<const vectorized::IColumn&>(*group->offsets).get_ptr();
+    size_t offsets_num_rows = offsets_col->size();
+    converter->add_column_data_convertor(offsets_column);
+    RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
+            {offsets_col, nullptr, ""}, 0, offsets_num_rows, column_id));
+    auto [status, converted] = converter->convert_column_data(column_id);
+    RETURN_IF_ERROR(status);
+    
RETURN_IF_ERROR(group_writer.offsets_writer->append(converted->get_nullmap(),
+                                                        converted->get_data(), 
offsets_num_rows));
+    converter->clear_source_content(column_id);
+    group_writer.offsets_opts.meta->set_num_rows(offsets_num_rows);
+    ++column_id;
+
+    // 2. Write child columns
+    for (const auto& [relative_path, subcolumn] : group->children) {
+        std::string child_col_name = tablet_column->name_lower_case() + 
".__ng." +
+                                     full_path + "." + 
relative_path.get_path();
+
+        const auto& child_type = subcolumn.get_least_common_type();
+        TabletColumn child_column = 
vectorized::schema_util::get_column_by_type(
+                child_type, child_col_name,
+                vectorized::schema_util::ExtraInfo {
+                        .unique_id = -1,
+                        .parent_unique_id = tablet_column->unique_id(),
+                        .path_info = vectorized::PathInData(child_col_name)});
+
+        ColumnWriterOptions child_opts = base_opts;
+        child_opts.meta = base_opts.footer->add_columns();
+        _init_column_meta(child_opts.meta, column_id, child_column, 
base_opts.compression_type);
+
+        // Mark child as part of NestedGroup
+        auto* child_path_info = child_opts.meta->mutable_column_path_info();
+        child_path_info->set_nested_group_parent_path(full_path);
+        child_path_info->set_path(child_col_name);
+    child_path_info->set_nested_group_depth(static_cast<uint32_t>(depth));
+
+        std::unique_ptr<ColumnWriter> child_writer;
+        RETURN_IF_ERROR(ColumnWriter::create(child_opts, &child_column, 
base_opts.file_writer,
+                                             &child_writer));
+        RETURN_IF_ERROR(child_writer->init());
+
+        // Get the child's finalized data (flat values)
+        auto child_col = subcolumn.get_finalized_column_ptr();
+        size_t child_num_rows = child_col->size();
+
+        converter->add_column_data_convertor(child_column);
+        RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
+                {child_col, nullptr, ""}, 0, child_num_rows, column_id));
+        auto [child_status, child_converted] = 
converter->convert_column_data(column_id);
+        RETURN_IF_ERROR(child_status);
+        RETURN_IF_ERROR(child_writer->append(child_converted->get_nullmap(),
+                                             child_converted->get_data(), 
child_num_rows));
+        converter->clear_source_content(column_id);
+        child_opts.meta->set_num_rows(child_num_rows);
+
+        group_writer.child_writers[relative_path.get_path()] = 
std::move(child_writer);
+        group_writer.child_opts[relative_path.get_path()] = child_opts;
+        ++column_id;
+    }
+
+    // 3. Recursively write nested groups within this group
+    for (const auto& [nested_path, nested_group] : group->nested_groups) {
+        RETURN_IF_ERROR(write_nested_group_recursive(
+                nested_group.get(), full_path, tablet_column, base_opts, 
converter,
+                group->current_flat_size, column_id, writers, statistics, 
depth + 1));
+    }
+
+    // Update statistics
+    NestedGroupInfoPB info;
+    info.set_element_count(static_cast<uint32_t>(group->current_flat_size));
+    info.set_child_count(static_cast<uint32_t>(group->children.size() + 
group->nested_groups.size()));
+    info.set_has_conflict(group->is_disabled);
+    statistics.nested_group_info[full_path] = info;
+
+    return Status::OK();
+}
+
+Status 
VariantColumnWriterImpl::_process_nested_groups(vectorized::ColumnVariant* ptr,
+                                                       
vectorized::OlapBlockDataConvertor* converter,
+                                                       size_t num_rows, int& 
column_id) {
+    const auto& nested_groups = ptr->get_nested_groups();
+
+    for (const auto& [array_path, group] : nested_groups) {
+        RETURN_IF_ERROR(write_nested_group_recursive(
+                group.get(), "", _tablet_column, _opts, converter,
+                num_rows, column_id, _nested_group_writers, _statistics, 1));
+    }
+
+    return Status::OK();
+}
+
 #include "common/compile_check_end.h"
 
 } // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.h 
b/be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.h
index 48753051548..e13f5003446 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.h
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.h
@@ -108,6 +108,14 @@ private:
     int _first_column_id = -1;
 };
 
+// NestedGroup writers for array<object> paths (offsets + child writers).
+struct NestedGroupWriter {
+    std::unique_ptr<ColumnWriter> offsets_writer;
+    std::unordered_map<std::string, std::unique_ptr<ColumnWriter>> 
child_writers;
+    ColumnWriterOptions offsets_opts;
+    std::unordered_map<std::string, ColumnWriterOptions> child_opts;
+};
+
 class VariantColumnWriterImpl {
 public:
     VariantColumnWriterImpl(const ColumnWriterOptions& opts, const 
TabletColumn* column);
@@ -137,6 +145,11 @@ private:
     Status _process_subcolumns(vectorized::ColumnVariant* ptr,
                                vectorized::OlapBlockDataConvertor* converter, 
size_t num_rows,
                                int& column_id);
+
+    // NestedGroup support
+    Status _process_nested_groups(vectorized::ColumnVariant* ptr,
+                                  vectorized::OlapBlockDataConvertor* 
converter, size_t num_rows,
+                                  int& column_id);
     // prepare a column for finalize
     doris::vectorized::MutableColumnPtr _column;
     doris::vectorized::ColumnUInt8 _null_column;
@@ -158,6 +171,7 @@ private:
 
     // hold the references of subcolumns info
     std::unordered_map<std::string, TabletSchema::SubColumnInfo> 
_subcolumns_info;
+    std::unordered_map<std::string, NestedGroupWriter> _nested_group_writers;
 };
 
 void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const 
TabletColumn& column,
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_statistics.h 
b/be/src/olap/rowset/segment_v2/variant/variant_statistics.h
index 8f01f354978..b2b84312957 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_statistics.h
+++ b/be/src/olap/rowset/segment_v2/variant/variant_statistics.h
@@ -17,11 +17,14 @@
 
 #pragma once
 
+#include <gen_cpp/segment_v2.pb.h>
+
 #include <map>
 #include <string>
 
-namespace doris {
-namespace segment_v2 {
+#include "common/logging.h"
+
+namespace doris::segment_v2 {
 
 #include "common/compile_check_begin.h"
 
@@ -29,11 +32,16 @@ struct VariantStatistics {
     // If reached the size of this, we should stop writing statistics for 
sparse data
     std::map<std::string, int64_t> subcolumns_non_null_size;
     std::map<std::string, int64_t> sparse_column_non_null_size;
+    // NestedGroup statistics (key: array path string)
+    std::map<std::string, NestedGroupInfoPB> nested_group_info;
 
     void to_pb(VariantStatisticsPB* stats) const {
         for (const auto& [path, value] : sparse_column_non_null_size) {
             stats->mutable_sparse_column_non_null_size()->emplace(path, value);
         }
+        for (const auto& [path, info] : nested_group_info) {
+            (*stats->mutable_nested_group_info())[path] = info;
+        }
         LOG(INFO) << "num subcolumns " << subcolumns_non_null_size.size() << 
", num sparse columns "
                   << sparse_column_non_null_size.size();
     }
@@ -42,10 +50,12 @@ struct VariantStatistics {
         for (const auto& [path, value] : stats.sparse_column_non_null_size()) {
             sparse_column_non_null_size[path] = value;
         }
+        for (const auto& [path, info] : stats.nested_group_info()) {
+            nested_group_info[path] = info;
+        }
     }
 };
 
 #include "common/compile_check_end.h"
 
-} // namespace segment_v2
-} // namespace doris
\ No newline at end of file
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/vec/columns/column_variant.cpp 
b/be/src/vec/columns/column_variant.cpp
index c23d7a3b5ee..3ef6a9fea1f 100644
--- a/be/src/vec/columns/column_variant.cpp
+++ b/be/src/vec/columns/column_variant.cpp
@@ -20,7 +20,7 @@
 
 #include "vec/columns/column_variant.h"
 
-#include <assert.h>
+#include <cassert>
 #include <fmt/core.h>
 #include <fmt/format.h>
 #include <glog/logging.h>
@@ -79,6 +79,27 @@ namespace {
 
 #include "common/compile_check_begin.h"
 
+// English comment: deep clone nested group tree (offsets + nested groups).
+std::shared_ptr<ColumnVariant::NestedGroup> clone_nested_group_tree(
+        const ColumnVariant::NestedGroup& src) {
+    auto dst = std::make_shared<ColumnVariant::NestedGroup>();
+    dst->path = src.path;
+    dst->current_flat_size = src.current_flat_size;
+    dst->is_disabled = src.is_disabled;
+    dst->expected_type = src.expected_type;
+    dst->expected_array_depth = src.expected_array_depth;
+    dst->children = src.children;
+    if (src.offsets) {
+        dst->offsets = 
src.offsets->clone_resized(src.offsets->size())->assume_mutable();
+    }
+    for (const auto& [p, ng] : src.nested_groups) {
+        if (ng) {
+            dst->nested_groups.emplace(p, clone_nested_group_tree(*ng));
+        }
+    }
+    return dst;
+}
+
 DataTypePtr create_array_of_type(PrimitiveType type, size_t num_dimensions, 
bool is_nullable,
                                  int precision = -1, int scale = -1) {
     DataTypePtr result = type == PrimitiveType::INVALID_TYPE
@@ -1254,6 +1275,232 @@ bool ColumnVariant::has_subcolumn(const PathInData& 
key) const {
     return subcolumns.find_leaf(key) != nullptr;
 }
 
+// NestedGroup management implementations
+
+ColumnVariant::NestedGroup* ColumnVariant::get_or_create_nested_group(const 
PathInData& array_path) {
+    auto it = _nested_groups.find(array_path);
+    if (it != _nested_groups.end()) {
+        return it->second.get();
+    }
+
+    auto group = std::make_shared<NestedGroup>();
+    group->path = array_path;
+    group->offsets = ColumnOffset64::create();
+
+    auto* ptr = group.get();
+    _nested_groups[array_path] = std::move(group);
+    return ptr;
+}
+
+ColumnVariant::NestedGroup* ColumnVariant::get_nested_group(const PathInData& 
array_path) {
+    auto it = _nested_groups.find(array_path);
+    if (it != _nested_groups.end()) {
+        return it->second.get();
+    }
+    return nullptr;
+}
+
+const ColumnVariant::NestedGroup* ColumnVariant::get_nested_group(
+        const PathInData& array_path) const {
+    auto it = _nested_groups.find(array_path);
+    if (it != _nested_groups.end()) {
+        return it->second.get();
+    }
+    return nullptr;
+}
+
+bool ColumnVariant::check_path_conflict(const PathInData& path, size_t 
array_depth,
+                                        const FieldInfo& field_info) const {
+    // Check if path is already disabled
+    if (_disabled_paths.contains(path)) {
+        return true; // Conflict detected
+    }
+
+    // Check existing NestedGroup for this path
+    auto it = _nested_groups.find(path);
+    if (it == _nested_groups.end()) {
+        return false; // No existing data, no conflict
+    }
+
+    const auto* group = it->second.get();
+    if (group->expected_type == NestedGroup::StructureType::UNKNOWN) {
+        return false; // First time seeing this path
+    }
+
+    // Conflict cases:
+    // 1. Current is scalar, expected array
+    // 2. Current is array, expected scalar
+    // 3. Array depth mismatch
+
+    bool current_is_array = (array_depth > 0);
+    bool expected_is_array = (group->expected_type == 
NestedGroup::StructureType::ARRAY ||
+                              group->expected_type == 
NestedGroup::StructureType::OBJECT);
+
+    if (current_is_array != expected_is_array) {
+        return true; // Structure conflict
+    }
+
+    if (current_is_array && array_depth != group->expected_array_depth) {
+        return true; // Array dimension conflict
+    }
+
+    return false;
+}
+
+void ColumnVariant::disable_path(const PathInData& path) {
+    _disabled_paths.insert(path);
+
+    // If there's a NestedGroup for this path, mark it as disabled
+    auto it = _nested_groups.find(path);
+    if (it != _nested_groups.end()) {
+        it->second->is_disabled = true;
+    }
+}
+
+const ColumnVariant::Subcolumn* ColumnVariant::get_subcolumn_from_nested_group(
+        const PathInData& path) const {
+    auto [array_path, relative_path] = split_array_path(path);
+    if (array_path.empty()) {
+        return nullptr; // Not a nested path
+    }
+
+    auto it = _nested_groups.find(array_path);
+    if (it == _nested_groups.end()) {
+        return nullptr;
+    }
+
+    const auto* group = it->second.get();
+    auto child_it = group->children.find(relative_path);
+    if (child_it == group->children.end()) {
+        return nullptr;
+    }
+
+    return &child_it->second;
+}
+
+std::pair<PathInData, PathInData> ColumnVariant::split_array_path(const 
PathInData& path) {
+    const auto& parts = path.get_parts();
+
+    // Find the FIRST array<object> part (marked with is_nested = true)
+    // Deeper nestings will be stored in their original form (as JSONB or 
array)
+    // This ensures we maintain element-level associations for the outermost 
array<object>
+    // while avoiding complexity of recursive nesting
+    //
+    // Example: {"a": [{"b": [{"c": 123}]}]}
+    //   path = a.b.c (a is_nested=true, b is_nested=true)
+    //   Result: array_path="a", relative_path="b.c"
+    //   The "b.c" path's value [[{"c": 123}]] is stored as-is in children["b"]
+    for (size_t i = 0; i < parts.size(); ++i) {
+        if (parts[i].is_nested) {
+            // Split at first nested position
+            PathInData::Parts array_parts(parts.begin(), parts.begin() + i + 
1);
+            PathInData::Parts relative_parts(parts.begin() + i + 1, 
parts.end());
+            return {PathInData(array_parts), PathInData(relative_parts)};
+        }
+    }
+
+    // No array<object> in path
+    return {PathInData {}, path};
+}
+
+std::vector<std::pair<PathInData, size_t>> 
ColumnVariant::get_nested_levels(const PathInData& path) {
+    const auto& parts = path.get_parts();
+    std::vector<std::pair<PathInData, size_t>> levels;
+    size_t depth = 0;
+
+    for (size_t i = 0; i < parts.size(); ++i) {
+        if (parts[i].is_nested) {
+            ++depth;
+            // Build the path up to this nested level
+            PathInData::Parts level_parts(parts.begin(), parts.begin() + i + 
1);
+            levels.emplace_back(PathInData(level_parts), depth);
+        }
+    }
+
+    return levels;
+}
+
+PathInData ColumnVariant::get_leaf_path_after_nested(const PathInData& path) {
+    const auto& parts = path.get_parts();
+
+    // Find the last nested part
+    size_t last_nested_pos = std::string::npos;
+    for (size_t i = 0; i < parts.size(); ++i) {
+        if (parts[i].is_nested) {
+            last_nested_pos = i;
+        }
+    }
+
+    if (last_nested_pos == std::string::npos || last_nested_pos + 1 >= 
parts.size()) {
+        // No nested part or nested part is the last element
+        return PathInData {};
+    }
+
+    // Return the path after the last nested part
+    PathInData::Parts leaf_parts(parts.begin() + last_nested_pos + 1, 
parts.end());
+    return PathInData(leaf_parts);
+}
+
+ColumnPtr ColumnVariant::get_finalized_column_from_nested_group(const 
PathInData& path) const {
+    auto [array_path, relative_path] = split_array_path(path);
+    if (array_path.empty()) {
+        return nullptr;
+    }
+
+    auto it = _nested_groups.find(array_path);
+    if (it == _nested_groups.end()) {
+        return nullptr;
+    }
+
+    const auto* group = it->second.get();
+    if (!group || group->is_disabled) {
+        return nullptr;
+    }
+
+    auto child_it = group->children.find(relative_path);
+    if (child_it == group->children.end()) {
+        return nullptr;
+    }
+
+    // Get the child column's finalized data
+    const auto& child = child_it->second;
+    auto child_col = child.get_finalized_column_ptr();
+
+    // Rebuild array column using offsets
+    // The offsets define how the flat child data maps back to row-level arrays
+    auto offsets_clone = group->offsets->clone_resized(group->offsets->size());
+    auto array_col = 
ColumnArray::create(child_col->clone_resized(child_col->size()),
+                                         std::move(offsets_clone));
+
+    return array_col;
+}
+
+// Helper function to recursively finalize a NestedGroup
+static void finalize_nested_group_recursive(ColumnVariant::NestedGroup* group,
+                                            ColumnVariant::FinalizeMode mode) {
+    if (!group || group->is_disabled) {
+        return;
+    }
+
+    // Finalize each child subcolumn
+    for (auto& [child_path, child] : group->children) {
+        if (!child.is_finalized()) {
+            child.finalize(mode);
+        }
+    }
+
+    // Recursively finalize nested groups within this group
+    for (auto& [nested_path, nested_group] : group->nested_groups) {
+        finalize_nested_group_recursive(nested_group.get(), mode);
+    }
+}
+
+void ColumnVariant::finalize_nested_groups(FinalizeMode mode) {
+    for (auto& [array_path, group] : _nested_groups) {
+        finalize_nested_group_recursive(group.get(), mode);
+    }
+}
+
 bool ColumnVariant::add_sub_column(const PathInData& key, MutableColumnPtr&& 
subcolumn,
                                    DataTypePtr type) {
     size_t new_size = subcolumn->size();
@@ -1860,6 +2107,10 @@ void ColumnVariant::finalize(FinalizeMode mode) {
             std::count_if(new_subcolumns.begin(), new_subcolumns.end(),
                           [](const auto& entry) { return 
entry->path.has_nested_part(); });
     std::swap(subcolumns, new_subcolumns);
+
+    // Finalize NestedGroup children
+    finalize_nested_groups(mode);
+
     _prev_positions.clear();
     ENABLE_CHECK_CONSISTENCY(this);
 }
@@ -2401,6 +2652,16 @@ MutableColumnPtr ColumnVariant::clone() const {
     auto sparse_column = std::move(*column).mutate();
     res->serialized_sparse_column = sparse_column->assume_mutable();
     res->set_num_rows(num_rows);
+
+    // Clone NestedGroups and disabled paths.
+    res->_disabled_paths = _disabled_paths;
+    res->_nested_groups.clear();
+    res->_nested_groups.reserve(_nested_groups.size());
+    for (const auto& [p, group] : _nested_groups) {
+        if (group) {
+            res->_nested_groups.emplace(p, clone_nested_group_tree(*group));
+        }
+    }
     ENABLE_CHECK_CONSISTENCY(res.get());
     return res;
 }
diff --git a/be/src/vec/columns/column_variant.h 
b/be/src/vec/columns/column_variant.h
index 9c38d5441a3..db96ceabede 100644
--- a/be/src/vec/columns/column_variant.h
+++ b/be/src/vec/columns/column_variant.h
@@ -39,6 +39,7 @@
 #include "vec/columns/column.h"
 #include "vec/columns/column_map.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/columns/column_vector.h"
 #include "vec/columns/subcolumn_tree.h"
 #include "vec/common/cow.h"
 #include "vec/common/string_ref.h"
@@ -264,6 +265,60 @@ public:
     };
     using Subcolumns = SubcolumnsTree<Subcolumn, false>;
 
+    /**
+     * Represents a nested array group with shared offsets.
+     * Each array<object> path gets its own NestedGroup to maintain
+     * element-level field associations.
+     *
+     * Example: for JSON {"items": [{"a":1,"b":"x"}, {"a":2,"b":"y"}]}
+     *   - NestedGroup path = "items"
+     *   - offsets = [2] (one row with 2 array elements)
+     *   - children = {"a": [1,2], "b": ["x","y"]}
+     */
+    struct NestedGroup {
+        // The path this group represents (e.g., "voltage_list")
+        PathInData path;
+
+        // Offsets array: prefix-sum of array sizes per row
+        // offsets[i] = total number of elements in rows [0, i]
+        // Example: if rows have arrays of sizes [2, 3, 1], offsets = [2, 5, 6]
+        MutableColumnPtr offsets;
+
+        // Child subcolumns under this array path
+        // Key: relative path under this array (e.g., "voltage", "current")
+        // Value: the subcolumn data (flat values, using shared offsets)
+        std::unordered_map<PathInData, Subcolumn, PathInData::Hash> children;
+
+        // For nested arrays within this group
+        // Key: relative path to child array
+        std::unordered_map<PathInData, std::shared_ptr<NestedGroup>, 
PathInData::Hash> nested_groups;
+
+        // Current total flat size (sum of all array elements so far)
+        size_t current_flat_size = 0;
+
+        // Conflict tracking: if true, this path is disabled and falls back to 
JSONB
+        bool is_disabled = false;
+
+        // Expected structure info for conflict detection
+        enum struct StructureType { UNKNOWN, SCALAR, ARRAY, OBJECT };
+        StructureType expected_type = StructureType::UNKNOWN;
+        size_t expected_array_depth = 0;
+
+        size_t size() const { return current_flat_size; }
+        size_t num_rows() const { return offsets ? offsets->size() : 0; }
+
+        // Initialize offsets column if not created
+        void ensure_offsets() {
+            if (!offsets) {
+                offsets = ColumnOffset64::create();
+            }
+        }
+    };
+
+    // Map from array path to NestedGroup
+    using NestedGroupsMap =
+            std::unordered_map<PathInData, std::shared_ptr<NestedGroup>, 
PathInData::Hash>;
+
 private:
     /// If true then all subcolumns are nullable.
     const bool is_nullable;
@@ -288,6 +343,15 @@ private:
     // subcolumns count materialized from nested paths
     size_t nested_path_count = 0;
 
+    // NestedGroup storage for array<object> paths
+    // Key: array path (e.g., "items", "data.records")
+    // Value: NestedGroup containing shared offsets and child columns
+    NestedGroupsMap _nested_groups;
+
+    // Paths that have been disabled due to structure conflicts
+    // Data for these paths falls back to root JSONB
+    std::unordered_set<PathInData, PathInData::Hash> _disabled_paths;
+
 public:
     static constexpr auto COLUMN_NAME_DUMMY = "_dummy";
 
@@ -387,6 +451,56 @@ public:
 
     ColumnPtr get_sparse_column() const { return serialized_sparse_column; }
 
+    // NestedGroup management APIs
+    // Get or create a NestedGroup for the given array path
+    NestedGroup* get_or_create_nested_group(const PathInData& array_path);
+
+    // Get an existing NestedGroup (returns nullptr if not found)
+    NestedGroup* get_nested_group(const PathInData& array_path);
+    const NestedGroup* get_nested_group(const PathInData& array_path) const;
+
+    // Get all NestedGroups (for iteration during write/finalize)
+    const NestedGroupsMap& get_nested_groups() const { return _nested_groups; }
+    NestedGroupsMap& get_nested_groups() { return _nested_groups; }
+
+    // Check if a path has structure conflict with existing data
+    bool check_path_conflict(const PathInData& path, size_t array_depth,
+                             const FieldInfo& field_info) const;
+
+    // Disable a path (fallback to JSONB)
+    void disable_path(const PathInData& path);
+
+    // Check if a path is disabled
+    bool is_path_disabled(const PathInData& path) const {
+        return _disabled_paths.contains(path);
+    }
+
+    // Get subcolumn from NestedGroup (for reading)
+    const Subcolumn* get_subcolumn_from_nested_group(const PathInData& path) 
const;
+
+    // Get finalized column from NestedGroup, reconstructing array semantics
+    // Returns array column with proper offsets
+    ColumnPtr get_finalized_column_from_nested_group(const PathInData& path) 
const;
+
+    // Finalize all NestedGroup children
+    void finalize_nested_groups(FinalizeMode mode);
+
+    // Split a path at the array<object> boundary
+    // Returns: (array_path, relative_path)
+    // If no array<object> in path, array_path is empty
+    static std::pair<PathInData, PathInData> split_array_path(const 
PathInData& path);
+
+    // Get all nested levels in a path
+    // Returns: vector of (array_path, depth) pairs for each nested level
+    // Example: for path "a.b.c.d" where a and c are nested:
+    //   Returns: [("a", 1), ("a.b.c", 2)]
+    static std::vector<std::pair<PathInData, size_t>> get_nested_levels(const 
PathInData& path);
+
+    // Get the relative path after removing all nested prefixes
+    // Example: for path "a.b.c.d" where a and c are nested:
+    //   Returns: "d" (the leaf path after the deepest nested level)
+    static PathInData get_leaf_path_after_nested(const PathInData& path);
+
     // use sparse_subcolumns_schema to record sparse column's path info and 
type
     static MutableColumnPtr create_sparse_column_fn() {
         return 
vectorized::ColumnMap::create(vectorized::ColumnString::create(),
diff --git a/be/src/vec/json/json_parser.cpp b/be/src/vec/json/json_parser.cpp
index 00d023b6c7c..ea6030c6f24 100644
--- a/be/src/vec/json/json_parser.cpp
+++ b/be/src/vec/json/json_parser.cpp
@@ -20,7 +20,7 @@
 
 #include "vec/json/json_parser.h"
 
-#include <assert.h>
+#include <cassert>
 #include <fmt/format.h>
 #include <glog/logging.h>
 
@@ -28,7 +28,7 @@
 #include <string_view>
 
 #include "common/cast_set.h"
-#include "common/config.h"
+// IWYU pragma: keep
 #include "common/status.h"
 #include "vec/json/path_in_data.h"
 #include "vec/json/simd_json_parser.h"
@@ -44,6 +44,8 @@ std::optional<ParseResult> 
JSONDataParser<ParserImpl>::parse(const char* begin,
         return {};
     }
     ParseContext context;
+    // English comment: enable_flatten_nested controls nested path traversal
+    // NestedGroup expansion is now handled at storage layer
     context.enable_flatten_nested = config.enable_flatten_nested;
     context.is_top_array = document.isArray();
     traverse(document, context);
@@ -62,11 +64,8 @@ void JSONDataParser<ParserImpl>::traverse(const Element& 
element, ParseContext&
     if (element.isObject()) {
         traverseObject(element.getObject(), ctx);
     } else if (element.isArray()) {
-        if (ctx.has_nested_in_flatten) {
-            throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
-                                   "Nesting of array in Nested array within 
variant subcolumns is "
-                                   "currently not supported.");
-        }
+        // English comment: allow nested arrays (multi-level) for NestedGroup; 
deeper levels are
+        // handled by VariantNestedBuilder with a max-depth guard.
         has_nested = false;
         check_has_nested_object(element);
         ctx.has_nested_in_flatten = has_nested && ctx.enable_flatten_nested;
@@ -225,7 +224,8 @@ void JSONDataParser<ParserImpl>::traverseArrayElement(const 
Element& element,
         }
     }
 
-    if (keys_to_update && !(is_top_array && ctx.has_nested_in_flatten)) {
+    // English comment: always fill missed values to keep element-level 
association between keys.
+    if (keys_to_update) {
         fillMissedValuesInArrays(ctx);
     }
 }
@@ -254,11 +254,8 @@ void 
JSONDataParser<ParserImpl>::handleExistingPath(std::pair<PathInData::Parts,
                                                     ParseArrayContext& ctx,
                                                     size_t& keys_to_update) {
     auto& path_array = path_data.second;
-    // For top_array structure we no need to check cur array size equals 
ctx.current_size
-    // because we do not need to maintain the association information between 
Nested in array
-    if (!(ctx.is_top_array && ctx.has_nested_in_flatten)) {
-        assert(path_array.size() == ctx.current_size);
-    }
+    // English comment: keep arrays aligned for all keys (including top-level 
arrays).
+    assert(path_array.size() == ctx.current_size);
     // If current element of array is part of Nested,
     // collect its size or check it if the size of
     // the Nested has been already collected.
@@ -285,11 +282,8 @@ void JSONDataParser<ParserImpl>::handleNewPath(UInt128 
hash, const PathInData::P
     Array path_array;
     path_array.reserve(ctx.total_size);
 
-    // For top_array structure we no need to resize array
-    // because we no need to fill default values for maintaining the 
association information between Nested in array
-    if (!(ctx.is_top_array && ctx.has_nested_in_flatten)) {
-        path_array.resize(ctx.current_size);
-    }
+    // English comment: always resize to keep alignment.
+    path_array.resize(ctx.current_size);
 
     auto nested_key = getNameOfNested(path, value);
     if (!nested_key.empty()) {
diff --git a/be/src/vec/json/parse2column.cpp b/be/src/vec/json/parse2column.cpp
index 127241b3667..7aa0696ce38 100644
--- a/be/src/vec/json/parse2column.cpp
+++ b/be/src/vec/json/parse2column.cpp
@@ -51,6 +51,107 @@
 
 namespace doris::vectorized {
 
+namespace {
+
+// English comment: parse JSON into (paths, values). If invalid JSON and 
strict mode is disabled,
+// fall back to treating it as a plain string field at root.
+template <typename ParserImpl>
+std::optional<ParseResult> parse_json_or_fallback_as_string(const char* src, 
size_t length,
+                                                           
JSONDataParser<ParserImpl>* parser,
+                                                           const ParseConfig& 
config) {
+    std::optional<ParseResult> result;
+    if (length > 0) {
+        result = parser->parse(src, length, config);
+    } else {
+        result = ParseResult {};
+    }
+    if (result) {
+        return result;
+    }
+    VLOG_DEBUG << "failed to parse " << std::string_view(src, length) << ", 
length= " << length;
+    if (config::variant_throw_exeception_on_invalid_json) {
+        throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse 
object {}",
+                               std::string_view(src, length));
+    }
+    PathInData root_path;
+    Field field = Field::create_field<TYPE_STRING>(String(src, length));
+    return ParseResult {.paths = {root_path}, .values = {field}};
+}
+
+inline void check_ambiguous_paths_or_fallback(const ColumnVariant& 
column_variant,
+                                             const std::vector<PathInData>& 
paths,
+                                             bool enable_flatten_nested) {
+    if (!enable_flatten_nested) {
+        return;
+    }
+    std::vector<PathInData> check_paths;
+    check_paths.reserve(column_variant.get_subcolumns().size() + paths.size());
+    for (const auto& entry : column_variant.get_subcolumns()) {
+        check_paths.push_back(entry->path);
+    }
+    check_paths.insert(check_paths.end(), paths.begin(), paths.end());
+    auto st = 
vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths);
+    if (st.ok()) {
+        return;
+    }
+    // When ambiguous paths detected, log warning and continue
+    // NestedGroup expansion will be handled at storage layer
+    LOG(WARNING) << "Ambiguous variant nested paths detected, will fallback to 
JSONB for conflicting paths. "
+                 << st;
+}
+
+inline void insert_paths_legacy(ColumnVariant& column_variant, 
std::vector<PathInData>& paths,
+                                std::vector<Field>& values, size_t 
old_num_rows) {
+    for (size_t i = 0; i < paths.size(); ++i) {
+        FieldInfo field_info;
+        schema_util::get_field_info(values[i], &field_info);
+        if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
+            continue;
+        }
+        if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
+            if (paths[i].has_nested_part()) {
+                column_variant.add_nested_subcolumn(paths[i], field_info, 
old_num_rows);
+            } else {
+                column_variant.add_sub_column(paths[i], old_num_rows);
+            }
+        }
+        auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
+        if (!subcolumn) {
+            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to 
find sub column {}",
+                                   paths[i].get_path());
+        }
+        if (subcolumn->cur_num_of_defaults() > 0) {
+            subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
+            subcolumn->reset_current_num_of_defaults();
+        }
+        if (subcolumn->size() != old_num_rows) {
+            throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                   "subcolumn {} size missmatched, may 
contains duplicated entry",
+                                   paths[i].get_path());
+        }
+        subcolumn->insert(std::move(values[i]), field_info);
+    }
+}
+
+inline void fill_missed_subcolumns(ColumnVariant& column_variant, size_t 
old_num_rows) {
+    const auto& subcolumns = column_variant.get_subcolumns();
+    for (const auto& entry : subcolumns) {
+        if (entry->data.size() != old_num_rows) {
+            continue;
+        }
+        if (entry->path.has_nested_part()) {
+            bool success = 
UNLIKELY(column_variant.try_insert_default_from_nested(entry));
+            if (!success) {
+                entry->data.insert_default();
+            }
+        } else {
+            entry->data.increment_default_counter();
+        }
+    }
+}
+
+} // namespace
+
 /** Pool for objects that cannot be used from different threads simultaneously.
   * Allows to create an object for each thread.
   * Pool has unbounded size and objects are not destroyed before destruction 
of pool.
@@ -133,84 +234,18 @@ template <typename ParserImpl>
 void parse_json_to_variant(IColumn& column, const char* src, size_t length,
                            JSONDataParser<ParserImpl>* parser, const 
ParseConfig& config) {
     auto& column_variant = assert_cast<ColumnVariant&>(column);
-    std::optional<ParseResult> result;
-    /// Treat empty string as an empty object
-    /// for better CAST from String to Object.
-    if (length > 0) {
-        result = parser->parse(src, length, config);
-    } else {
-        result = ParseResult {};
-    }
-    if (!result) {
-        VLOG_DEBUG << "failed to parse " << std::string_view(src, length) << 
", length= " << length;
-        if (config::variant_throw_exeception_on_invalid_json) {
-            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to 
parse object {}",
-                                   std::string_view(src, length));
-        }
-        // Treat as string
-        PathInData root_path;
-        Field field = Field::create_field<TYPE_STRING>(String(src, length));
-        result = ParseResult {{root_path}, {field}};
-    }
+    auto result = parse_json_or_fallback_as_string(src, length, parser, 
config);
     auto& [paths, values] = *result;
     assert(paths.size() == values.size());
     size_t old_num_rows = column_variant.rows();
-    if (config.enable_flatten_nested) {
-        // here we should check the paths in variant and paths in result,
-        // if two paths which same prefix have different structure, we should 
throw an exception
-        std::vector<PathInData> check_paths;
-        for (const auto& entry : column_variant.get_subcolumns()) {
-            check_paths.push_back(entry->path);
-        }
-        check_paths.insert(check_paths.end(), paths.begin(), paths.end());
-        
THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
-    }
-    for (size_t i = 0; i < paths.size(); ++i) {
-        FieldInfo field_info;
-        schema_util::get_field_info(values[i], &field_info);
-        if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
-            continue;
-        }
-        if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
-            if (paths[i].has_nested_part()) {
-                column_variant.add_nested_subcolumn(paths[i], field_info, 
old_num_rows);
-            } else {
-                column_variant.add_sub_column(paths[i], old_num_rows);
-            }
-        }
-        auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
-        if (!subcolumn) {
-            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to 
find sub column {}",
-                                   paths[i].get_path());
-        }
-        if (subcolumn->cur_num_of_defaults() > 0) {
-            subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
-            subcolumn->reset_current_num_of_defaults();
-        }
-        if (subcolumn->size() != old_num_rows) {
-            throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                   "subcolumn {} size missmatched, may 
contains duplicated entry",
-                                   paths[i].get_path());
-        }
-        subcolumn->insert(std::move(values[i]), std::move(field_info));
-    }
-    // /// Insert default values to missed subcolumns.
-    const auto& subcolumns = column_variant.get_subcolumns();
-    for (const auto& entry : subcolumns) {
-        if (entry->data.size() == old_num_rows) {
-            // Handle nested paths differently from simple paths
-            if (entry->path.has_nested_part()) {
-                // Try to insert default from nested, if failed, insert 
regular default
-                bool success = 
UNLIKELY(column_variant.try_insert_default_from_nested(entry));
-                if (!success) {
-                    entry->data.insert_default();
-                }
-            } else {
-                // For non-nested paths, increment default counter
-                entry->data.increment_default_counter();
-            }
-        }
-    }
+    check_ambiguous_paths_or_fallback(column_variant, paths, 
config.enable_flatten_nested);
+
+    // Always use legacy path insertion - NestedGroup expansion moved to 
storage layer
+    insert_paths_legacy(column_variant, paths, values, old_num_rows);
+
+    // Insert default values to missed subcolumns.
+    fill_missed_subcolumns(column_variant, old_num_rows);
+
     column_variant.incr_num_rows();
     auto sparse_column = column_variant.get_sparse_column();
     if (sparse_column->size() == old_num_rows) {
@@ -224,7 +259,7 @@ void parse_json_to_variant(IColumn& column, const char* 
src, size_t length,
 // exposed interfaces
 void parse_json_to_variant(IColumn& column, const StringRef& json, JsonParser* 
parser,
                            const ParseConfig& config) {
-    return parse_json_to_variant(column, json.data, json.size, parser, config);
+    parse_json_to_variant(column, json.data, json.size, parser, config);
 }
 
 void parse_json_to_variant(IColumn& column, const ColumnString& 
raw_json_column,
diff --git a/be/src/vec/json/path_in_data.h b/be/src/vec/json/path_in_data.h
index 1e21859a7e7..5416472248b 100644
--- a/be/src/vec/json/path_in_data.h
+++ b/be/src/vec/json/path_in_data.h
@@ -57,6 +57,14 @@ public:
         /// "k1" is nested and has anonymous_array_level = 0.
         /// "k2" and "k3" are not nested and have anonymous_array_level = 2.
         UInt8 anonymous_array_level = 0;
+
+        /// Get the total array depth for this part.
+        /// Used for NestedGroup offset indexing.
+        /// If is_nested (array<object>), the depth is anonymous_array_level + 
1
+        /// Otherwise it's just the anonymous_array_level.
+        UInt8 get_array_depth() const {
+            return is_nested ? (anonymous_array_level + 1) : 
anonymous_array_level;
+        }
     };
     using Parts = std::vector<Part>;
     PathInData() = default;
diff --git a/be/src/vec/json/variant_nested_builder.cpp 
b/be/src/vec/json/variant_nested_builder.cpp
new file mode 100644
index 00000000000..f4017fc8f64
--- /dev/null
+++ b/be/src/vec/json/variant_nested_builder.cpp
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/json/variant_nested_builder.h"
+
+#include <glog/logging.h>
+
+#include <unordered_map>
+
+#include "vec/columns/column_vector.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/schema_util.h"
+
+namespace doris::vectorized {
+
+VariantNestedBuilder::VariantNestedBuilder(ColumnVariant& column_variant)
+        : _column_variant(column_variant) {}
+
+void VariantNestedBuilder::build(const std::vector<PathInData>& paths,
+                                 const std::vector<Field>& values, size_t 
old_num_rows) {
+    // Group paths by their first-level nested path
+    // Key: first-level array_path
+    // Value: list of (path_index, all_levels) pairs
+    using PathInfo = std::pair<size_t, std::vector<std::pair<PathInData, 
size_t>>>;
+    std::unordered_map<PathInData, std::vector<PathInfo>, PathInData::Hash> 
paths_by_first_level;
+
+    // Separate paths into nested and non-nested
+    std::vector<size_t> non_nested_indices;
+
+    for (size_t i = 0; i < paths.size(); ++i) {
+        auto levels = ColumnVariant::get_nested_levels(paths[i]);
+
+        if (levels.empty()) {
+            // Non-nested path
+            non_nested_indices.push_back(i);
+        } else if (levels[0].second > _max_depth) {
+            // Exceeds max depth, treat as non-nested (will be stored as JSONB)
+            non_nested_indices.push_back(i);
+        } else {
+            // Group by first level
+            paths_by_first_level[levels[0].first].emplace_back(i, 
std::move(levels));
+        }
+    }
+
+    // Process non-nested paths
+    for (size_t idx : non_nested_indices) {
+        process_path_value(paths[idx], values[idx], old_num_rows);
+    }
+
+    // Process each first-level nested group
+    for (auto& [first_level_path, path_infos] : paths_by_first_level) {
+        // Check for conflict
+        FieldInfo dummy_info;
+        if (_column_variant.check_path_conflict(first_level_path, 1, 
dummy_info)) {
+            VLOG_DEBUG << "Path conflict detected for array path: " << 
first_level_path.get_path();
+            // Disable the path and fall back to regular subcolumn processing
+            _column_variant.disable_path(first_level_path);
+
+            // Write conflicting paths to regular subcolumns (will be stored 
as JSONB)
+            for (const auto& [path_idx, levels] : path_infos) {
+                process_path_value(paths[path_idx], values[path_idx], 
old_num_rows);
+            }
+            continue;
+        }
+
+        // Get or create first-level NestedGroup
+        auto* group = 
_column_variant.get_or_create_nested_group(first_level_path);
+        if (!group) {
+            LOG(WARNING) << "Failed to create NestedGroup for path: " << 
first_level_path.get_path();
+            continue;
+        }
+
+        // Determine array size from first value (all paths in same group 
should have same outer array size)
+        size_t first_level_array_size = 0;
+        if (!path_infos.empty()) {
+            FieldInfo info = get_field_info(values[path_infos[0].first]);
+            first_level_array_size = 
get_array_size(values[path_infos[0].first], info);
+        }
+
+        // Update first-level offsets
+        group->ensure_offsets();
+        auto& offsets_col = assert_cast<ColumnOffset64&>(*group->offsets);
+        size_t prev_offset = offsets_col.empty() ? 0 : 
offsets_col.get_data().back();
+        offsets_col.insert_value(prev_offset + first_level_array_size);
+        group->current_flat_size = prev_offset + first_level_array_size;
+
+        // Set expected type
+        if (group->expected_type == 
ColumnVariant::NestedGroup::StructureType::UNKNOWN) {
+            group->expected_type = 
ColumnVariant::NestedGroup::StructureType::ARRAY;
+            group->expected_array_depth = 1;
+        }
+
+        // Process each path in this group
+        for (const auto& [path_idx, levels] : path_infos) {
+            PathInData leaf_path = 
ColumnVariant::get_leaf_path_after_nested(paths[path_idx]);
+            FieldInfo info = get_field_info(values[path_idx]);
+            write_to_nested_group(levels, leaf_path, values[path_idx], info, 
old_num_rows);
+        }
+    }
+}
+
+void VariantNestedBuilder::write_to_nested_group(
+        const std::vector<std::pair<PathInData, size_t>>& levels,
+        const PathInData& leaf_path,
+        const Field& value, const FieldInfo& info, size_t old_num_rows) {
+    if (levels.empty()) {
+        return;
+    }
+
+    // Get the first-level group
+    auto* first_group = _column_variant.get_nested_group(levels[0].first);
+    if (!first_group || first_group->is_disabled) {
+        return;
+    }
+
+    // Start recursive processing from level 0
+    write_to_nested_group_recursive(first_group, levels, 0, leaf_path, value, 
info, 0);
+}
+
+// NOLINTNEXTLINE(readability-function-cognitive-complexity)
+void VariantNestedBuilder::write_to_nested_group_recursive(
+        ColumnVariant::NestedGroup* parent_group,
+        const std::vector<std::pair<PathInData, size_t>>& levels,
+        size_t current_level_idx,
+        const PathInData& leaf_path,
+        const Field& value, const FieldInfo& info,
+        size_t parent_flat_offset) {
+
+    if (current_level_idx >= levels.size()) {
+        return;
+    }
+
+    bool is_last_level = (current_level_idx == levels.size() - 1);
+    size_t current_depth = levels[current_level_idx].second;
+
+    // Check if exceeds max depth
+    if (current_depth > _max_depth) {
+        _write_to_nested_group_exceed_depth(parent_group, levels, 
current_level_idx, leaf_path,
+                                            value, info);
+        return;
+    }
+
+    if (is_last_level) {
+        _write_to_nested_group_last_level(parent_group, leaf_path, value, 
info, parent_flat_offset);
+    } else {
+        _write_to_nested_group_non_last_level(parent_group, levels, 
current_level_idx, leaf_path,
+                                              value, info);
+    }
+}
+
+void VariantNestedBuilder::_write_to_nested_group_exceed_depth(
+        ColumnVariant::NestedGroup* parent_group,
+        const std::vector<std::pair<PathInData, size_t>>& levels,
+        size_t current_level_idx,
+        const PathInData& leaf_path,
+        const Field& value, const FieldInfo& info) {
+    const auto& current_level_path = levels[current_level_idx].first;
+    PathInData relative_path = get_relative_path(
+            leaf_path.empty() ? current_level_path
+                              : PathInData(current_level_path.get_path() + "." 
+ leaf_path.get_path()),
+            levels[0].first);
+    auto it = parent_group->children.find(relative_path);
+    if (it == parent_group->children.end()) {
+        ColumnVariant::Subcolumn subcolumn(0, true);
+        parent_group->children[relative_path] = std::move(subcolumn);
+        it = parent_group->children.find(relative_path);
+    }
+    it->second.insert(value, info);
+}
+
+void 
VariantNestedBuilder::_write_to_nested_group_last_level(ColumnVariant::NestedGroup*
 parent_group,
+                                                             const PathInData& 
leaf_path,
+                                                             const Field& 
value,
+                                                             const FieldInfo& 
info,
+                                                             size_t 
parent_flat_offset) {
+    if (info.num_dimensions > 0 && value.get_type() == 
PrimitiveType::TYPE_ARRAY) {
+        const auto& arr = value.get<Array>();
+        auto it = parent_group->children.find(leaf_path);
+        if (it == parent_group->children.end()) {
+            ColumnVariant::Subcolumn subcolumn(0, true);
+            if (parent_flat_offset > 0 || parent_group->current_flat_size > 
arr.size()) {
+                size_t defaults_needed = parent_group->current_flat_size - 
arr.size();
+                if (defaults_needed > 0) {
+                    subcolumn.insert_many_defaults(defaults_needed);
+                }
+            }
+            parent_group->children[leaf_path] = std::move(subcolumn);
+            it = parent_group->children.find(leaf_path);
+        }
+        for (const auto& elem : arr) {
+            FieldInfo elem_info;
+            schema_util::get_field_info(elem, &elem_info);
+            it->second.insert(elem, elem_info);
+        }
+        return;
+    }
+    auto it = parent_group->children.find(leaf_path);
+    if (it == parent_group->children.end()) {
+        ColumnVariant::Subcolumn subcolumn(0, true);
+        parent_group->children[leaf_path] = std::move(subcolumn);
+        it = parent_group->children.find(leaf_path);
+    }
+    it->second.insert(value, info);
+}
+
+void VariantNestedBuilder::_write_to_nested_group_non_last_level(
+        ColumnVariant::NestedGroup* parent_group,
+        const std::vector<std::pair<PathInData, size_t>>& levels,
+        size_t current_level_idx,
+        const PathInData& leaf_path,
+        const Field& value, const FieldInfo& info) {
+    const auto& current_level_path = levels[current_level_idx].first;
+    const auto& next_level_path = levels[current_level_idx + 1].first;
+    PathInData relative_nested_path = get_relative_path(next_level_path, 
current_level_path);
+    auto nested_it = parent_group->nested_groups.find(relative_nested_path);
+    if (nested_it == parent_group->nested_groups.end()) {
+        auto nested_group = std::make_shared<ColumnVariant::NestedGroup>();
+        nested_group->path = relative_nested_path;
+        nested_group->offsets = ColumnOffset64::create();
+        parent_group->nested_groups[relative_nested_path] = 
std::move(nested_group);
+        nested_it = parent_group->nested_groups.find(relative_nested_path);
+    }
+    auto* nested_group = nested_it->second.get();
+    if (info.num_dimensions <= 0 || value.get_type() != 
PrimitiveType::TYPE_ARRAY) {
+        return;
+    }
+    const auto& arr = value.get<Array>();
+    for (const auto& elem : arr) {
+        if (elem.get_type() != PrimitiveType::TYPE_ARRAY) {
+            continue;
+        }
+        const auto& inner_arr = elem.get<Array>();
+        nested_group->ensure_offsets();
+        auto& nested_offsets = 
assert_cast<ColumnOffset64&>(*nested_group->offsets);
+        size_t prev_nested_offset = nested_offsets.empty() ? 0 : 
nested_offsets.get_data().back();
+        nested_offsets.insert_value(prev_nested_offset + inner_arr.size());
+        nested_group->current_flat_size = prev_nested_offset + 
inner_arr.size();
+        FieldInfo inner_info;
+        schema_util::get_field_info(elem, &inner_info);
+        write_to_nested_group_recursive(nested_group, levels, 
current_level_idx + 1, leaf_path, elem,
+                                        inner_info, prev_nested_offset);
+    }
+}
+
+void VariantNestedBuilder::write_to_subcolumn(const PathInData& path, const 
Field& value,
+                                              const FieldInfo& info, size_t 
old_num_rows) {
+    // Use existing ColumnVariant API for non-nested paths
+    if (_column_variant.get_subcolumn(path) == nullptr) {
+        if (path.has_nested_part()) {
+            _column_variant.add_nested_subcolumn(path, info, old_num_rows);
+        } else {
+            _column_variant.add_sub_column(path, old_num_rows);
+        }
+    }
+
+    auto* subcolumn = _column_variant.get_subcolumn(path);
+    if (!subcolumn) {
+        LOG(WARNING) << "Failed to get subcolumn for path: " << 
path.get_path();
+        return;
+    }
+
+    // Handle pending defaults
+    if (subcolumn->cur_num_of_defaults() > 0) {
+        subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
+        subcolumn->reset_current_num_of_defaults();
+    }
+
+    // Insert the value
+    subcolumn->insert(value, info);
+}
+
+void VariantNestedBuilder::process_path_value(const PathInData& path, const 
Field& value,
+                                              size_t old_num_rows) {
+    FieldInfo info = get_field_info(value);
+
+    if (info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
+        // Skip invalid types
+        return;
+    }
+
+    write_to_subcolumn(path, value, info, old_num_rows);
+}
+
+PathInData VariantNestedBuilder::get_relative_path(const PathInData& full_path,
+                                                    const PathInData& prefix) {
+    const auto& full_parts = full_path.get_parts();
+    const auto& prefix_parts = prefix.get_parts();
+
+    if (prefix_parts.size() >= full_parts.size()) {
+        return PathInData {};
+    }
+
+    // Verify prefix matches
+    for (size_t i = 0; i < prefix_parts.size(); ++i) {
+        if (full_parts[i].key != prefix_parts[i].key) {
+            return full_path; // Prefix doesn't match, return full path
+        }
+    }
+
+    // Return the remaining parts
+    PathInData::Parts relative_parts(full_parts.begin() + prefix_parts.size(), 
full_parts.end());
+    return PathInData(relative_parts);
+}
+
+FieldInfo VariantNestedBuilder::get_field_info(const Field& value) {
+    FieldInfo info;
+    schema_util::get_field_info(value, &info);
+    return info;
+}
+
+size_t VariantNestedBuilder::get_array_size(const Field& value, const 
FieldInfo& info) {
+    if (info.num_dimensions > 0 && value.get_type() == 
PrimitiveType::TYPE_ARRAY) {
+        return value.get<Array>().size();
+    }
+    // Scalar value treated as single element
+    return 1;
+}
+
+void VariantNestedBuilder::flatten_array_values(const Field& value, size_t 
target_depth,
+                                                 std::vector<Field>& 
flat_values,
+                                                 std::vector<size_t>& 
level_sizes) {
+    if (target_depth == 0 || value.get_type() != PrimitiveType::TYPE_ARRAY) {
+        flat_values.push_back(value);
+        return;
+    }
+
+    const auto& arr = value.get<Array>();
+    level_sizes.push_back(arr.size());
+
+    for (const auto& elem : arr) {
+        flatten_array_values(elem, target_depth - 1, flat_values, level_sizes);
+    }
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/json/variant_nested_builder.h 
b/be/src/vec/json/variant_nested_builder.h
new file mode 100644
index 00000000000..5a5a39e84df
--- /dev/null
+++ b/be/src/vec/json/variant_nested_builder.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <utility>
+#include <vector>
+
+#include "vec/columns/column_variant.h"
+#include "vec/core/field.h"
+#include "vec/json/path_in_data.h"
+
+namespace doris::vectorized {
+
+/**
+ * Builder for constructing Variant columns with NestedGroup support.
+ * Maintains the association of fields within array elements.
+ *
+ * When enabled, array<object> paths are stored using shared offsets,
+ * allowing element-level field associations to be preserved.
+ *
+ * Example: for JSON {"items": [{"a":1,"b":"x"}, {"a":2,"b":"y"}]}
+ *   - Without NestedGroup: items.a = [1,2], items.b = ["x","y"] (association 
lost)
+ *   - With NestedGroup: items has shared offsets, items.a and items.b
+ *                       can be correctly associated by element index
+ */
+class VariantNestedBuilder {
+public:
+    explicit VariantNestedBuilder(ColumnVariant& column_variant);
+
+    /**
+     * Build from parsed JSON result.
+     * Groups paths by their array<object> prefix and writes to NestedGroups.
+     *
+     * @param paths The paths from JSON parsing
+     * @param values The values corresponding to each path
+     * @param old_num_rows Number of rows before this insert
+     */
+    void build(const std::vector<PathInData>& paths, const std::vector<Field>& 
values,
+               size_t old_num_rows);
+
+    // Set maximum array depth to track with NestedGroup
+    void set_max_depth(size_t max_depth) { _max_depth = max_depth; }
+
+private:
+    // Process a single path-value pair (for non-array paths)
+    void process_path_value(const PathInData& path, const Field& value, size_t 
old_num_rows);
+
+    // Write value to nested group (supports multi-level nesting)
+    // levels: all nested levels in the path [(path1, depth1), (path2, 
depth2), ...]
+    // leaf_path: the path after all nested levels
+    void write_to_nested_group(
+            const std::vector<std::pair<PathInData, size_t>>& levels,
+            const PathInData& leaf_path,
+            const Field& value, const FieldInfo& info, size_t old_num_rows);
+
+    // Recursively write to nested group at a specific level
+    void write_to_nested_group_recursive(
+            ColumnVariant::NestedGroup* parent_group,
+            const std::vector<std::pair<PathInData, size_t>>& levels,
+            size_t current_level_idx,
+            const PathInData& leaf_path,
+            const Field& value, const FieldInfo& info,
+            size_t parent_flat_offset);
+
+    // English comment: split complex recursive logic into smaller helpers for 
readability and linting.
+    void _write_to_nested_group_exceed_depth(ColumnVariant::NestedGroup* 
parent_group,
+                                            const 
std::vector<std::pair<PathInData, size_t>>& levels,
+                                            size_t current_level_idx,
+                                            const PathInData& leaf_path,
+                                            const Field& value, const 
FieldInfo& info);
+
+    void _write_to_nested_group_last_level(ColumnVariant::NestedGroup* 
parent_group,
+                                           const PathInData& leaf_path,
+                                           const Field& value, const 
FieldInfo& info,
+                                           size_t parent_flat_offset);
+
+    void _write_to_nested_group_non_last_level(ColumnVariant::NestedGroup* 
parent_group,
+                                               const 
std::vector<std::pair<PathInData, size_t>>& levels,
+                                               size_t current_level_idx,
+                                               const PathInData& leaf_path,
+                                               const Field& value, const 
FieldInfo& info);
+
+    // Write to regular subcolumn (for non-nested paths)
+    void write_to_subcolumn(const PathInData& path, const Field& value, const 
FieldInfo& info,
+                            size_t old_num_rows);
+
+    // Get relative path between two paths
+    static PathInData get_relative_path(const PathInData& full_path, const 
PathInData& prefix);
+
+    // Get field info from value
+    static FieldInfo get_field_info(const Field& value);
+
+    // Extract array size from a field (for determining offsets)
+    static size_t get_array_size(const Field& value, const FieldInfo& info);
+
+    // Flatten nested array values to the deepest level
+    static void flatten_array_values(const Field& value, size_t target_depth,
+                                     std::vector<Field>& flat_values,
+                                     std::vector<size_t>& level_sizes);
+
+    ColumnVariant& _column_variant;
+    size_t _max_depth = 3;
+};
+
+} // namespace doris::vectorized
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index 6af2977e7eb..fead642fa1c 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -166,11 +166,33 @@ message ColumnPathInfo {
     optional uint32 parrent_column_unique_id = 4;
     // is_typed flags if the subcolumn is assigned specific types
     optional bool is_typed = 5;
+
+    // NestedGroup support fields
+    // True if this column is the offsets column for a NestedGroup
+    optional bool is_nested_group_offsets = 10;
+    // The parent array path for NestedGroup columns
+    optional string nested_group_parent_path = 11;
+    // Depth level for nested groups (for multi-level nested arrays)
+    optional uint32 nested_group_depth = 12;
+}
+
+// Statistics for a single NestedGroup
+message NestedGroupInfoPB {
+    // Total number of array elements across all rows
+    optional uint32 element_count = 1;
+    // Number of child fields in this NestedGroup
+    optional uint32 child_count = 2;
+    // True if a structure conflict was detected
+    optional bool has_conflict = 3;
 }
 
 message VariantStatisticsPB {
     // in the order of subcolumns in variant
     map<string, uint32> sparse_column_non_null_size = 2;
+
+    // NestedGroup statistics
+    // Key: array path string (e.g., "items", "data.records")
+    map<string, NestedGroupInfoPB> nested_group_info = 3;
 } 
 
 message ColumnMetaPB {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to