This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 0ef12f0452f branch-4.1: [opt](variant) Reduce sparse variant parse 
memory (#64197)
0ef12f0452f is described below

commit 0ef12f0452fbe2cbed10cbb4eac4e7c239738974
Author: lihangyu <[email protected]>
AuthorDate: Mon Jun 8 17:12:17 2026 +0800

    branch-4.1: [opt](variant) Reduce sparse variant parse memory (#64197)
    
    cherry-pick #63970
---
 be/src/common/config.cpp                           |   6 +
 be/src/common/config.h                             |   3 +
 be/src/exec/common/variant_util.cpp                |  75 +-
 be/src/storage/segment/segment_writer.cpp          |   4 +-
 .../segment/variant/variant_column_writer_impl.cpp | 514 +++++++++--
 .../segment/variant/variant_column_writer_impl.h   |   6 +
 be/test/exec/common/schema_util_test.cpp           |   4 +-
 .../segment/variant_column_writer_reader_test.cpp  | 943 +++++++++++++++++++++
 be/test/storage/segment/variant_util_test.cpp      | 307 ++++++-
 9 files changed, 1749 insertions(+), 113 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5e85a1e1cb6..3660e4368ec 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1133,6 +1133,10 @@ 
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
 DEFINE_mInt32(variant_max_json_key_length, "255");
 DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
 DEFINE_mBool(variant_enable_duplicate_json_path_check, "false");
+// Controls storage-layer parse target for plain non-doc VARIANT columns:
+// 0 = auto, 1 = force parse-time subcolumns, 2 = force doc-value KV staging.
+// NestedGroup, deprecated flatten-nested, and persistent doc mode keep their 
required paths.
+DEFINE_mInt32(variant_storage_parse_mode, "0");
 DEFINE_mBool(enable_vertical_compact_variant_subcolumns, "true");
 DEFINE_mBool(enable_variant_doc_sparse_write_subcolumns, "true");
 // Maximum depth of nested arrays to track with NestedGroup
@@ -1143,6 +1147,8 @@ 
DEFINE_mBool(variant_nested_group_discard_scalar_on_conflict, "false");
 
 DEFINE_Validator(variant_max_json_key_length,
                  [](const int config) -> bool { return config > 0 && config <= 
65535; });
+DEFINE_Validator(variant_storage_parse_mode,
+                 [](const int config) -> bool { return config >= 0 && config 
<= 2; });
 
 // block file cache
 DEFINE_Bool(enable_file_cache, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7935823041b..e40c8d017e1 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1402,6 +1402,9 @@ DECLARE_mInt32(variant_max_json_key_length);
 DECLARE_mBool(variant_throw_exeception_on_invalid_json);
 // Enable duplicate path check when parsing json into variant subcolumns/jsonb.
 DECLARE_mBool(variant_enable_duplicate_json_path_check);
+// Controls storage-layer parse target for plain non-doc VARIANT columns:
+// 0 = auto, 1 = force parse-time subcolumns, 2 = force doc-value KV staging.
+DECLARE_mInt32(variant_storage_parse_mode);
 // Enable vertical compact subcolumns of variant column
 DECLARE_mBool(enable_vertical_compact_variant_subcolumns);
 DECLARE_mBool(enable_variant_doc_sparse_write_subcolumns);
diff --git a/be/src/exec/common/variant_util.cpp 
b/be/src/exec/common/variant_util.cpp
index 232733de723..32dd3cce6f3 100644
--- a/be/src/exec/common/variant_util.cpp
+++ b/be/src/exec/common/variant_util.cpp
@@ -1863,7 +1863,6 @@ void parse_json_to_variant_impl(IColumn& column, const 
char* src, size_t length,
         }
         break;
     case ParseConfig::ParseTo::OnlyDocValueColumn: {
-        CHECK(column_variant.enable_doc_mode()) << "OnlyDocValueColumn 
requires doc mode enabled";
         std::vector<size_t> doc_item_indexes;
         doc_item_indexes.reserve(paths.size());
         phmap::flat_hash_set<StringRef, StringRefHash> seen_paths;
@@ -1873,6 +1872,14 @@ void parse_json_to_variant_impl(IColumn& column, const 
char* src, size_t length,
             FieldInfo field_info;
             get_field_info(values[i], &field_info);
             if (paths[i].empty()) {
+                // Plain non-doc VARIANT can use doc-value KV as writer-side 
staging. An
+                // invalid root entry from JSON object/array is neither a 
scalar root value nor
+                // a doc KV path, so leave this row's doc offset empty. 
Doc-mode and valid scalar
+                // roots still populate the root subcolumn below.
+                if (!column_variant.enable_doc_mode() &&
+                    field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
+                    continue;
+                }
                 auto* subcolumn = column_variant.get_subcolumn(paths[i]);
                 DCHECK(subcolumn != nullptr);
                 flush_defaults(subcolumn);
@@ -2024,10 +2031,15 @@ Status _parse_and_materialize_variant_columns(Block& 
block,
     for (size_t i = 0; i < variant_pos.size(); ++i) {
         auto column_ref = block.get_by_position(variant_pos[i]).column;
         bool is_nullable = column_ref->is_nullable();
-        MutableColumnPtr var_column = column_ref->assume_mutable();
+        MutableColumnPtr owner_column = std::move(*column_ref).mutate();
+        ColumnPtr nullable_null_map;
+        MutableColumnPtr var_column;
         if (is_nullable) {
-            const auto& nullable = assert_cast<const 
ColumnNullable&>(*column_ref);
-            var_column = nullable.get_nested_column_ptr()->assume_mutable();
+            const auto& nullable = assert_cast<const 
ColumnNullable&>(*owner_column);
+            nullable_null_map = nullable.get_null_map_column_ptr();
+            var_column = std::move(*nullable.get_nested_column_ptr()).mutate();
+        } else {
+            var_column = std::move(owner_column);
         }
         auto& var = assert_cast<ColumnVariant&>(*var_column);
         var_column->finalize();
@@ -2077,9 +2089,7 @@ Status _parse_and_materialize_variant_columns(Block& 
block,
         // Wrap variant with nullmap if it is nullable
         ColumnPtr result = variant_column->get_ptr();
         if (is_nullable) {
-            const auto& null_map =
-                    assert_cast<const 
ColumnNullable&>(*column_ref).get_null_map_column_ptr();
-            result = ColumnNullable::create(result, null_map);
+            result = ColumnNullable::create(result, nullable_null_map);
         }
         block.get_by_position(variant_pos[i]).column = result;
     }
@@ -2094,6 +2104,49 @@ Status parse_and_materialize_variant_columns(Block& 
block, const std::vector<uin
             { return _parse_and_materialize_variant_columns(block, 
variant_pos, configs); });
 }
 
+namespace {
+
+ParseConfig::ParseTo select_storage_variant_parse_target(const TabletColumn& 
column,
+                                                         const ParseConfig& 
config) {
+    // NestedGroup consumes the parse-time subcolumn tree to build nested 
storage structures, so it
+    // must not go through doc-value staging.
+    if (column.variant_enable_nested_group()) {
+        return ParseConfig::ParseTo::OnlySubcolumns;
+    }
+
+    // Persistent doc mode owns doc-value bucket columns in VariantDocWriter. 
Keep it separate from
+    // the plain non-doc staging optimization, even when typed paths or parent 
indexes exist.
+    if (column.variant_enable_doc_mode()) {
+        return ParseConfig::ParseTo::OnlyDocValueColumn;
+    }
+
+    // Deprecated flatten-nested still consumes parse-time subcolumns. 
Predefined typed paths and
+    // parent inverted indexes are handled later by regular doc-value staging: 
typed paths are
+    // forced into the materialized set unless typed-to-sparse is enabled, and 
materialized dynamic
+    // subcolumns inherit parent indexes while sparse payloads stay unindexed.
+    if (config.deprecated_enable_flatten_nested) {
+        return ParseConfig::ParseTo::OnlySubcolumns;
+    }
+
+    // Plain dynamic non-doc VARIANT can avoid eagerly creating thousands of 
parse-time subcolumns.
+    // The segment writer will pick the materialized/sparse split from this 
doc-value KV staging.
+    // Keep a BE switch so tests and rollouts can compare the old parse-time 
path with staging under
+    // the same writer and schema.
+    switch (config::variant_storage_parse_mode) {
+    case 0:
+    case 2:
+        return ParseConfig::ParseTo::OnlyDocValueColumn;
+    case 1:
+        return ParseConfig::ParseTo::OnlySubcolumns;
+    default:
+        CHECK(false) << "invalid variant_storage_parse_mode: "
+                     << config::variant_storage_parse_mode;
+        return ParseConfig::ParseTo::OnlyDocValueColumn;
+    }
+}
+
+} // namespace
+
 Status parse_and_materialize_variant_columns(Block& block, const TabletSchema& 
tablet_schema,
                                              const std::vector<uint32_t>& 
column_pos) {
     std::vector<uint32_t> variant_column_pos;
@@ -2124,13 +2177,7 @@ Status parse_and_materialize_variant_columns(Block& 
block, const TabletSchema& t
             return Status::InternalError("column is not variant type, column 
name: {}",
                                          column.name());
         }
-        // if doc mode is not enabled, no need to parse to doc value column
-        if (!column.variant_enable_doc_mode()) {
-            configs[i].parse_to = ParseConfig::ParseTo::OnlySubcolumns;
-            continue;
-        }
-
-        configs[i].parse_to = ParseConfig::ParseTo::OnlyDocValueColumn;
+        configs[i].parse_to = select_storage_variant_parse_target(column, 
configs[i]);
     }
 
     RETURN_IF_ERROR(parse_and_materialize_variant_columns(block, 
variant_column_pos, configs));
diff --git a/be/src/storage/segment/segment_writer.cpp 
b/be/src/storage/segment/segment_writer.cpp
index 426c7c2491e..81540c1dd03 100644
--- a/be/src/storage/segment/segment_writer.cpp
+++ b/be/src/storage/segment/segment_writer.cpp
@@ -1256,11 +1256,11 @@ Status 
SegmentWriter::_generate_short_key_index(std::vector<IOlapColumnDataAcces
     return Status::OK();
 }
 
-inline bool SegmentWriter::_is_mow() {
+bool SegmentWriter::_is_mow() {
     return _tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write;
 }
 
-inline bool SegmentWriter::_is_mow_with_cluster_key() {
+bool SegmentWriter::_is_mow_with_cluster_key() {
     return _is_mow() && !_tablet_schema->cluster_key_uids().empty();
 }
 
diff --git a/be/src/storage/segment/variant/variant_column_writer_impl.cpp 
b/be/src/storage/segment/variant/variant_column_writer_impl.cpp
index 30f7e2b2a73..1dccec87807 100644
--- a/be/src/storage/segment/variant/variant_column_writer_impl.cpp
+++ b/be/src/storage/segment/variant/variant_column_writer_impl.cpp
@@ -23,6 +23,7 @@
 #include <string_view>
 #include <unordered_map>
 #include <unordered_set>
+#include <variant>
 
 #include "common/cast_set.h"
 #include "common/status.h"
@@ -185,6 +186,7 @@ struct SubcolumnSparseData {
 
 using DocSparseSubcolumns = phmap::flat_hash_map<StringRef, 
SubcolumnSparseData, StringRefHash>;
 using DocValuePathStats = phmap::flat_hash_map<StringRef, uint32_t, 
StringRefHash>;
+using DocValuePathSet = phmap::flat_hash_set<StringRef, StringRefHash>;
 
 struct SubcolumnWriteEntry {
     std::string_view path;
@@ -203,8 +205,30 @@ struct SubcolumnWritePlan {
     DocValuePathStats stats;
 };
 
+enum class DocValueMaterializationMode {
+    // Materialize every doc-value path into a full column.
+    DenseAllPaths,
+    // Materialize every doc-value path as values plus row ids; gaps are 
filled during write.
+    RowIdAllPaths,
+    // Materialize only selected doc-value paths as values plus row ids. The 
caller keeps
+    // unselected paths in doc-value or sparse columns.
+    RowIdSelectedPaths,
+};
+
+struct DocValueMaterializationOptions {
+    int64_t min_rows = 0;
+    // Full doc-value path statistics already computed by the caller. It is an 
optimization only:
+    // stats stay scoped to all doc-value paths, never just selected 
materialized paths.
+    const DocValuePathStats* precomputed_stats = nullptr;
+    // Optional materialized path filter. Used by plain non-doc staging after 
choosing which paths
+    // become real subcolumns; the remaining doc-value items are emitted to 
sparse columns.
+    const DocValuePathSet* selected_paths = nullptr;
+};
+
 constexpr size_t kInitialDocPathReserve = 8192;
 
+void release_processed_subcolumn_write_entry(SubcolumnWriteEntry* entry);
+
 // Build per-path non-null counts from the serialized doc-value representation.
 void build_doc_value_stats(const ColumnVariant& variant, DocValuePathStats* 
stats) {
     auto [column_key, column_value] = 
variant.get_doc_value_data_paths_and_values();
@@ -229,6 +253,7 @@ void build_doc_value_stats(const ColumnVariant& variant, 
DocValuePathStats* stat
 // For each row, we decode only present (path, value) pairs and append them to 
the
 // corresponding subcolumn, while recording the row id to allow gap filling 
later.
 void build_sparse_subcolumns(const ColumnVariant& variant, const 
DocValuePathStats& stats,
+                             const DocValuePathSet* selected_paths,
                              DocSparseSubcolumns* subcolumns) {
     auto [column_key, column_value] = 
variant.get_doc_value_data_paths_and_values();
     const auto& column_offsets = variant.serialized_doc_value_column_offsets();
@@ -242,6 +267,9 @@ void build_sparse_subcolumns(const ColumnVariant& variant, 
const DocValuePathSta
         const size_t end = column_offsets[row];
         for (size_t i = start; i < end; ++i) {
             const StringRef path = column_key->get_data_at(i);
+            if (selected_paths != nullptr && !selected_paths->contains(path)) {
+                continue;
+            }
             auto stat_it = stats.find(path);
             DCHECK(stat_it != stats.end());
             auto [data_it, inserted] = subcolumns->try_emplace(path);
@@ -256,39 +284,80 @@ void build_sparse_subcolumns(const ColumnVariant& 
variant, const DocValuePathSta
     }
 }
 
-SubcolumnWritePlan build_subcolumn_write_plan(const ColumnVariant& variant, 
size_t num_rows,
-                                              int64_t 
variant_doc_materialization_min_rows) {
-    SubcolumnWritePlan plan;
-    // Below threshold: skip materialization and let finalize() compute stats 
on demand.
-    if (num_rows < static_cast<size_t>(variant_doc_materialization_min_rows)) {
-        return plan;
+void set_doc_value_stats(const ColumnVariant& variant, const 
DocValuePathStats* precomputed_stats,
+                         DocValuePathStats* stats) {
+    // Plain non-doc staging computes stats before choosing top-N materialized 
paths and reuses them
+    // here. Persistent doc mode has no preselected paths, so stats are built 
on demand.
+    if (precomputed_stats != nullptr) {
+        *stats = *precomputed_stats;
+    } else {
+        build_doc_value_stats(variant, stats);
     }
+}
 
+DocValueMaterializationMode choose_doc_value_materialization_mode(
+        const DocValueMaterializationOptions& options) {
+    // "RowId" means the materialized subcolumn is built from only present 
values plus row ids. It is
+    // different from the final sparse column that stores unmaterialized 
variant paths.
+    if (options.selected_paths != nullptr) {
+        return DocValueMaterializationMode::RowIdSelectedPaths;
+    }
     if (config::enable_variant_doc_sparse_write_subcolumns) {
-        build_doc_value_stats(variant, &plan.stats);
-        build_sparse_subcolumns(variant, plan.stats, &plan.sparse_subcolumns);
-        plan.entries.reserve(plan.sparse_subcolumns.size());
-        for (auto& [path, sparse] : plan.sparse_subcolumns) {
-            SubcolumnWriteEntry entry;
-            // StringRef points to variant storage; valid for the plan's 
lifetime.
-            entry.path = std::string_view(path.data, path.size);
-            entry.subcolumn = &sparse.subcolumn;
-            entry.rowids = &sparse.rowids;
-            plan.entries.push_back(entry);
-        }
-        return plan;
+        return DocValueMaterializationMode::RowIdAllPaths;
+    }
+    return DocValueMaterializationMode::DenseAllPaths;
+}
+
+void append_sparse_write_entries(DocSparseSubcolumns* sparse_subcolumns,
+                                 std::vector<SubcolumnWriteEntry>* entries) {
+    entries->reserve(sparse_subcolumns->size());
+    for (auto& [path, sparse] : *sparse_subcolumns) {
+        SubcolumnWriteEntry entry;
+        // StringRef points to variant storage; valid for the plan's lifetime.
+        entry.path = std::string_view(path.data, path.size);
+        entry.subcolumn = &sparse.subcolumn;
+        entry.rowids = &sparse.rowids;
+        entries->push_back(entry);
     }
+}
 
-    build_doc_value_stats(variant, &plan.stats);
-    plan.dense_subcolumns =
-            variant_util::materialize_docs_to_subcolumns_map(variant, 
plan.stats.size());
-    plan.entries.reserve(plan.dense_subcolumns.size());
-    for (auto& [path, subcolumn] : plan.dense_subcolumns) {
+void append_dense_write_entries(SubcolumnWritePlan::DenseSubcolumns* 
dense_subcolumns,
+                                std::vector<SubcolumnWriteEntry>* entries) {
+    entries->reserve(dense_subcolumns->size());
+    for (auto& [path, subcolumn] : *dense_subcolumns) {
         SubcolumnWriteEntry entry;
         entry.path = path;
         entry.subcolumn = &subcolumn;
         entry.rowids = nullptr;
-        plan.entries.push_back(entry);
+        entries->push_back(entry);
+    }
+}
+
+SubcolumnWritePlan build_subcolumn_write_plan(const ColumnVariant& variant, 
size_t num_rows,
+                                              const 
DocValueMaterializationOptions& options) {
+    SubcolumnWritePlan plan;
+    // Below threshold: skip materialization and let finalize() compute stats 
on demand.
+    if (num_rows < static_cast<size_t>(options.min_rows)) {
+        return plan;
+    }
+
+    set_doc_value_stats(variant, options.precomputed_stats, &plan.stats);
+    switch (choose_doc_value_materialization_mode(options)) {
+    case DocValueMaterializationMode::RowIdSelectedPaths:
+        DCHECK(options.selected_paths != nullptr);
+        build_sparse_subcolumns(variant, plan.stats, options.selected_paths,
+                                &plan.sparse_subcolumns);
+        append_sparse_write_entries(&plan.sparse_subcolumns, &plan.entries);
+        break;
+    case DocValueMaterializationMode::RowIdAllPaths:
+        build_sparse_subcolumns(variant, plan.stats, nullptr, 
&plan.sparse_subcolumns);
+        append_sparse_write_entries(&plan.sparse_subcolumns, &plan.entries);
+        break;
+    case DocValueMaterializationMode::DenseAllPaths:
+        plan.dense_subcolumns =
+                variant_util::materialize_docs_to_subcolumns_map(variant, 
plan.stats.size());
+        append_dense_write_entries(&plan.dense_subcolumns, &plan.entries);
+        break;
     }
     return plan;
 }
@@ -298,10 +367,16 @@ Status execute_doc_write_pipeline(const ColumnVariant& 
variant, size_t num_rows,
                                   int64_t 
variant_doc_materialization_min_rows, int& column_id,
                                   WriteMaterializedFn&& write_materialized_fn,
                                   WriteDocValueFn&& write_doc_value_fn,
-                                  DocValuePathStats* out_column_stats) {
+                                  DocValuePathStats* out_column_stats,
+                                  const DocValuePathStats* precomputed_stats = 
nullptr,
+                                  const DocValuePathSet* selected_paths = 
nullptr) {
     {
-        SubcolumnWritePlan plan =
-                build_subcolumn_write_plan(variant, num_rows, 
variant_doc_materialization_min_rows);
+        DocValueMaterializationOptions options {
+                .min_rows = variant_doc_materialization_min_rows,
+                .precomputed_stats = precomputed_stats,
+                .selected_paths = selected_paths,
+        };
+        SubcolumnWritePlan plan = build_subcolumn_write_plan(variant, 
num_rows, options);
         *out_column_stats = std::move(plan.stats);
         if (out_column_stats->empty()) {
             build_doc_value_stats(variant, out_column_stats);
@@ -309,6 +384,7 @@ Status execute_doc_write_pipeline(const ColumnVariant& 
variant, size_t num_rows,
 
         for (auto& entry : plan.entries) {
             RETURN_IF_ERROR(write_materialized_fn(entry, column_id));
+            release_processed_subcolumn_write_entry(&entry);
         }
     }
     RETURN_IF_ERROR(write_doc_value_fn(column_id));
@@ -324,7 +400,7 @@ Status finish_and_write_column_writer(ColumnWriter* writer) 
{
 void release_processed_subcolumn_write_entry(SubcolumnWriteEntry* entry) {
     DCHECK(entry != nullptr);
     DCHECK(entry->subcolumn != nullptr);
-    ColumnVariant::Subcolumn released_subcolumn;
+    ColumnVariant::Subcolumn released_subcolumn(0, true);
     std::swap(*entry->subcolumn, released_subcolumn);
     if (entry->rowids != nullptr) {
         std::vector<uint32_t> released_rowids;
@@ -383,7 +459,12 @@ Status prepare_materialized_subcolumn_writer(
     std::unique_ptr<ColumnWriter> writer;
     variant_util::inherit_column_attributes(parent_column, tablet_column);
 
-    bool need_record_none_null_value_size = true;
+    const auto& path_info = tablet_column.path_info_ptr();
+    DCHECK(path_info != nullptr);
+    const bool need_record_none_null_value_size =
+            (!path_info->get_is_typed() || 
parent_column.variant_enable_typed_paths_to_sparse()) &&
+            !path_info->has_nested_part() &&
+            variant_util::should_record_variant_path_stats(parent_column);
 
     RETURN_IF_ERROR(_create_column_writer(
             current_column_id, tablet_column, 
base_opts.rowset_ctx->tablet_schema,
@@ -555,6 +636,266 @@ Status append_sparse_converted_column(const TabletColumn& 
tablet_column, ColumnW
     converter->clear_source_content(cid);
     return Status::OK();
 }
+
+bool has_doc_value_data(const ColumnVariant& variant) {
+    if (variant.size() == 0) {
+        return false;
+    }
+    const auto& offsets = variant.serialized_doc_value_column_offsets();
+    return !offsets.empty() && offsets[variant.size() - 1] > 0;
+}
+
+// The variant root column is always written by _process_root_column(). The 
plan below only decides
+// how to write non-root variant data: extracted subcolumns, sparse columns, 
or doc-value buckets.
+// Used when extracted columns already own all non-root variant data.
+struct ExtractedColumnsOwnDataPlan {};
+
+// Used when JSON parse already expanded paths into ColumnVariant subcolumns.
+struct ParseTimeSubcolumnsWritePlan {
+    // True when sparse columns are generated from the parse tree after top-N 
selection.
+    bool write_sparse_columns = false;
+};
+
+// Used when plain non-doc VARIANT arrives as temporary doc-value KV staging.
+struct DocValueStagingWritePlan {};
+
+// Used by persistent doc mode; non-root data is written to doc-value bucket 
columns.
+struct PersistentDocValueWritePlan {};
+
+using VariantNonRootWritePlan =
+        std::variant<ExtractedColumnsOwnDataPlan, ParseTimeSubcolumnsWritePlan,
+                     DocValueStagingWritePlan, PersistentDocValueWritePlan>;
+
+template <typename... Visitors>
+struct Overloaded : Visitors... {
+    using Visitors::operator()...;
+};
+template <typename... Visitors>
+Overloaded(Visitors...) -> Overloaded<Visitors...>;
+
+VariantNonRootWritePlan build_variant_non_root_write_plan(const TabletColumn& 
tablet_column,
+                                                          const ColumnVariant& 
variant,
+                                                          bool 
has_extracted_columns) {
+    if (has_extracted_columns) {
+        return ExtractedColumnsOwnDataPlan {};
+    }
+
+    // Plain non-doc VARIANT may arrive as doc-value KV staging from storage 
parse. The staging data
+    // is internal to this root writer and is converted into materialized 
subcolumns plus sparse
+    // columns.
+    if (!tablet_column.variant_enable_doc_mode() && 
!tablet_column.variant_enable_nested_group() &&
+        has_doc_value_data(variant)) {
+        return DocValueStagingWritePlan {};
+    }
+
+    if (tablet_column.variant_enable_doc_mode()) {
+        return PersistentDocValueWritePlan {};
+    }
+
+    return ParseTimeSubcolumnsWritePlan {
+            .write_sparse_columns =
+                    
variant_util::should_write_variant_binary_columns(tablet_column)};
+}
+
+Status collect_typed_subcolumn_info_from_parse_tree(
+        const ColumnVariant& variant, const TabletColumn& tablet_column,
+        const TabletSchema& tablet_schema,
+        std::unordered_map<std::string, TabletSchema::SubColumnInfo>* 
subcolumns_info) {
+    for (const auto& entry : 
variant_util::get_sorted_subcolumns(variant.get_subcolumns())) {
+        if (entry->path.empty()) {
+            // already handled
+            continue;
+        }
+        // Not supported nested path to generate sub column info, currently
+        if (entry->path.has_nested_part()) {
+            continue;
+        }
+        TabletSchema::SubColumnInfo sub_column_info;
+        if (variant_util::generate_sub_column_info(tablet_schema, 
tablet_column.unique_id(),
+                                                   entry->path.get_path(), 
&sub_column_info)) {
+            subcolumns_info->emplace(entry->path.get_path(), 
std::move(sub_column_info));
+        }
+    }
+    return Status::OK();
+}
+
+Status prepare_parse_time_subcolumns_for_write(
+        ColumnVariant* variant, const TabletColumn& tablet_column,
+        const TabletSchema& tablet_schema,
+        std::unordered_map<std::string, TabletSchema::SubColumnInfo>* 
subcolumns_info) {
+    // Temporary doc-value staging has no parse-time subcolumns, so only 
ParseTimeSubcolumnsWritePlan
+    // reaches this helper. Parse-time paths still need typed-path storage 
conversion before their
+    // writers are created.
+    RETURN_IF_ERROR(collect_typed_subcolumn_info_from_parse_tree(*variant, 
tablet_column,
+                                                                 
tablet_schema, subcolumns_info));
+    
RETURN_IF_ERROR(variant->convert_typed_path_to_storage_type(*subcolumns_info));
+    return Status::OK();
+}
+
+Status prepare_non_root_write_plan_before_write(
+        const VariantNonRootWritePlan& non_root_write_plan, ColumnVariant* 
variant,
+        const TabletColumn& tablet_column, const TabletSchema& tablet_schema,
+        std::unordered_map<std::string, TabletSchema::SubColumnInfo>* 
subcolumns_info) {
+    if 
(std::holds_alternative<ParseTimeSubcolumnsWritePlan>(non_root_write_plan)) {
+        RETURN_IF_ERROR(prepare_parse_time_subcolumns_for_write(variant, 
tablet_column,
+                                                                tablet_schema, 
subcolumns_info));
+    }
+    return Status::OK();
+}
+
+Status prepare_sparse_columns_from_parse_tree(
+        const VariantNonRootWritePlan& non_root_write_plan, ColumnVariant* 
variant,
+        const TabletColumn& tablet_column,
+        const std::unordered_map<std::string, TabletSchema::SubColumnInfo>& 
subcolumns_info) {
+    const auto* parse_time_plan = 
std::get_if<ParseTimeSubcolumnsWritePlan>(&non_root_write_plan);
+    if (parse_time_plan == nullptr || !parse_time_plan->write_sparse_columns) {
+        return Status::OK();
+    }
+    return variant->pick_subcolumns_to_sparse_column(
+            subcolumns_info, 
tablet_column.variant_enable_typed_paths_to_sparse());
+}
+
+struct RegularVariantDocValuePlan {
+    DocValuePathStats stats;
+    DocValuePathSet materialized_paths;
+};
+
+size_t dotted_path_depth(StringRef path) {
+    return static_cast<size_t>(std::count(path.data, path.data + path.size, 
'.'));
+}
+
+Status build_regular_variant_doc_value_plan(
+        const ColumnVariant& variant, const TabletColumn& parent_column,
+        const TabletSchema& tablet_schema,
+        std::unordered_map<std::string, TabletSchema::SubColumnInfo>* 
subcolumns_info,
+        RegularVariantDocValuePlan* plan) {
+    build_doc_value_stats(variant, &plan->stats);
+    if (plan->stats.empty()) {
+        return Status::OK();
+    }
+
+    struct Candidate {
+        StringRef path;
+        uint32_t non_null_count = 0;
+    };
+    std::vector<Candidate> dynamic_candidates;
+    dynamic_candidates.reserve(plan->stats.size());
+    const bool materialize_all_dynamic_paths = 
parent_column.variant_max_subcolumns_count() == 0;
+
+    for (const auto& [path_ref, non_null_count] : plan->stats) {
+        if (path_ref.size == 0 || non_null_count == 0) {
+            continue;
+        }
+        std::string path(path_ref.data, path_ref.size);
+        TabletSchema::SubColumnInfo sub_column_info;
+        const bool is_typed_path = variant_util::generate_sub_column_info(
+                tablet_schema, parent_column.unique_id(), path, 
&sub_column_info);
+        if (is_typed_path) {
+            subcolumns_info->emplace(path, std::move(sub_column_info));
+        }
+        if (is_typed_path && 
!parent_column.variant_enable_typed_paths_to_sparse()) {
+            plan->materialized_paths.emplace(path_ref);
+            continue;
+        }
+        dynamic_candidates.push_back({path_ref, non_null_count});
+    }
+
+    std::sort(dynamic_candidates.begin(), dynamic_candidates.end(),
+              [](const Candidate& lhs, const Candidate& rhs) {
+                  if (lhs.non_null_count != rhs.non_null_count) {
+                      return lhs.non_null_count > rhs.non_null_count;
+                  }
+                  const auto lhs_depth = dotted_path_depth(lhs.path);
+                  const auto rhs_depth = dotted_path_depth(rhs.path);
+                  if (lhs_depth != rhs_depth) {
+                      return lhs_depth > rhs_depth;
+                  }
+                  return std::string_view(lhs.path.data, lhs.path.size) >
+                         std::string_view(rhs.path.data, rhs.path.size);
+              });
+
+    const size_t dynamic_limit =
+            materialize_all_dynamic_paths
+                    ? dynamic_candidates.size()
+                    : 
static_cast<size_t>(parent_column.variant_max_subcolumns_count());
+    for (size_t i = 0; i < std::min(dynamic_limit, dynamic_candidates.size()); 
++i) {
+        plan->materialized_paths.emplace(dynamic_candidates[i].path);
+    }
+    return Status::OK();
+}
+
+Status append_typed_doc_value_to_sparse_column(const ColumnString* doc_values, 
size_t doc_value_pos,
+                                               std::string_view path,
+                                               const DataTypePtr& storage_type,
+                                               ColumnString* sparse_keys,
+                                               ColumnString* sparse_values) {
+    ColumnVariant::Subcolumn subcolumn(0, true, false);
+    subcolumn.deserialize_from_binary_column(doc_values, doc_value_pos);
+    subcolumn.finalize(ColumnVariant::FinalizeMode::WRITE_MODE);
+
+    ColumnPtr current_column = subcolumn.get_finalized_column_ptr()->get_ptr();
+    DataTypePtr current_type = subcolumn.get_least_common_type();
+    if (!storage_type->equals(*current_type)) {
+        RETURN_IF_ERROR(variant_util::cast_column({current_column, 
current_type, ""}, storage_type,
+                                                  &current_column));
+    }
+
+    DataTypePtr sparse_type = storage_type;
+    if (!current_column->is_nullable()) {
+        current_column = make_nullable(current_column);
+        sparse_type = make_nullable(storage_type);
+    }
+
+    auto mutable_column = IColumn::mutate(std::move(current_column));
+    ColumnVariant::Subcolumn typed_subcolumn(std::move(mutable_column), 
sparse_type, true, false);
+    typed_subcolumn.serialize_to_binary_column(sparse_keys, path, 
sparse_values, 0);
+    return Status::OK();
+}
+
+Status build_sparse_column_from_doc_values(
+        const ColumnVariant& variant, const DocValuePathSet& 
materialized_paths,
+        const std::unordered_map<std::string, TabletSchema::SubColumnInfo>& 
typed_paths,
+        size_t num_rows, MutableColumnPtr* result) {
+    auto sparse_column = ColumnVariant::create_binary_column_fn();
+    auto& sparse_map = assert_cast<ColumnMap&>(*sparse_column);
+    auto& sparse_keys = assert_cast<ColumnString&>(sparse_map.get_keys());
+    auto& sparse_values = assert_cast<ColumnString&>(sparse_map.get_values());
+    auto& sparse_offsets = sparse_map.get_offsets();
+    sparse_offsets.reserve(num_rows);
+
+    std::unordered_map<std::string_view, DataTypePtr> typed_storage_types;
+    typed_storage_types.reserve(typed_paths.size());
+    for (const auto& [path, subcolumn_info] : typed_paths) {
+        typed_storage_types.emplace(
+                std::string_view(path.data(), path.size()),
+                
DataTypeFactory::instance().create_data_type(subcolumn_info.column));
+    }
+
+    const auto [doc_keys, doc_values] = 
variant.get_doc_value_data_paths_and_values();
+    const auto& doc_offsets = variant.serialized_doc_value_column_offsets();
+    for (size_t row = 0; row < num_rows; ++row) {
+        const size_t start = doc_offsets[row - 1];
+        const size_t end = doc_offsets[row];
+        for (size_t i = start; i < end; ++i) {
+            const StringRef path = doc_keys->get_data_at(i);
+            if (materialized_paths.contains(path)) {
+                continue;
+            }
+            const std::string_view path_view(path.data, path.size);
+            if (auto typed_it = typed_storage_types.find(path_view);
+                typed_it != typed_storage_types.end()) {
+                RETURN_IF_ERROR(append_typed_doc_value_to_sparse_column(
+                        doc_values, i, path_view, typed_it->second, 
&sparse_keys, &sparse_values));
+            } else {
+                sparse_keys.insert_data(path.data, path.size);
+                sparse_values.insert_from(*doc_values, i);
+            }
+        }
+        sparse_offsets.push_back(sparse_keys.size());
+    }
+    *result = std::move(sparse_column);
+    return Status::OK();
+}
 } // namespace
 
 Status UnifiedSparseColumnWriter::init(const TabletColumn* parent_column, int 
bucket_num,
@@ -699,7 +1040,8 @@ Status 
UnifiedSparseColumnWriter::append_single_sparse(const ColumnVariant& src,
                                                        OlapBlockDataConvertor* 
converter,
                                                        const TabletColumn& 
parent_column) {
     TabletColumn sparse_column = 
variant_util::create_sparse_column(parent_column);
-    converter->add_column_data_convertor(sparse_column);
+    converter->resize(_first_column_id + 1);
+    converter->add_column_data_convertor_at(sparse_column, _first_column_id);
     DCHECK_EQ(src.get_sparse_column()->size(), num_rows);
     RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
             {src.get_sparse_column(), nullptr, ""}, 0, num_rows, 
_first_column_id));
@@ -799,10 +1141,11 @@ Status 
UnifiedSparseColumnWriter::append_bucket_sparse(const ColumnVariant& src,
             bucket_offsets[b]->push_back(bucket_keys[b]->size());
         }
     }
+    converter->resize(_first_column_id + bucket_num);
     for (int b = 0; b < bucket_num; ++b) {
         TabletColumn bucket_col = 
variant_util::create_sparse_shard_column(parent_column, b);
-        converter->add_column_data_convertor(bucket_col);
         int this_col_id = _first_column_id + b;
+        converter->add_column_data_convertor_at(bucket_col, this_col_id);
         RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
                 {tmp_maps[b]->get_ptr(), nullptr, ""}, 0, num_rows, 
this_col_id));
         auto [st, converted] = converter->convert_column_data(this_col_id);
@@ -1330,6 +1673,46 @@ Status 
VariantColumnWriterImpl::_process_subcolumns(ColumnVariant* ptr,
     return Status::OK();
 }
 
+Status VariantColumnWriterImpl::_process_regular_doc_value_staging(
+        ColumnVariant* ptr, OlapBlockDataConvertor* converter, size_t 
num_rows, int& column_id) {
+    DCHECK(!_tablet_column->variant_enable_doc_mode());
+    DCHECK(!_tablet_column->variant_enable_nested_group());
+    RegularVariantDocValuePlan plan;
+    RETURN_IF_ERROR(build_regular_variant_doc_value_plan(
+            *ptr, *_tablet_column, *_opts.rowset_ctx->tablet_schema, 
&_subcolumns_info, &plan));
+
+    DocValuePathStats column_stats;
+    RETURN_IF_ERROR(execute_doc_write_pipeline(
+            *ptr, num_rows, 0 /* materialize selected paths regardless of row 
count */, column_id,
+            [this, num_rows, converter](SubcolumnWriteEntry& entry, int& 
materialized_column_id) {
+                return write_materialized_subcolumn(*_tablet_column, 
entry.path, *entry.subcolumn,
+                                                    num_rows, converter, 
materialized_column_id,
+                                                    _opts, 
&_subcolumns_indexes, &_subcolumn_opts,
+                                                    &_subcolumn_writers, 
entry.rowids);
+            },
+            [this, ptr, num_rows, converter, &plan](int& binary_column_id) {
+                if 
(!variant_util::should_write_variant_binary_columns(*_tablet_column)) {
+                    return Status::OK();
+                }
+
+                MutableColumnPtr sparse_column;
+                RETURN_IF_ERROR(build_sparse_column_from_doc_values(
+                        *ptr, plan.materialized_paths, _subcolumns_info, 
num_rows, &sparse_column));
+                auto sparse_variant = ColumnVariant::create(0, false, 
num_rows);
+                sparse_variant->set_sparse_column(sparse_column->get_ptr());
+
+                _binary_writer = std::make_unique<UnifiedSparseColumnWriter>();
+                const int bucket_num =
+                        std::max(1, 
_tablet_column->variant_sparse_hash_shard_count());
+                RETURN_IF_ERROR(_binary_writer->init(_tablet_column, 
bucket_num, binary_column_id,
+                                                     _opts, _opts.footer));
+                return _binary_writer->append_data(_tablet_column, 
*sparse_variant, num_rows,
+                                                   converter);
+            },
+            &column_stats, &plan.stats, &plan.materialized_paths));
+    return Status::OK();
+}
+
 Status VariantColumnWriterImpl::_process_binary_column(ColumnVariant* ptr,
                                                        OlapBlockDataConvertor* 
converter,
                                                        size_t num_rows, int& 
column_id) {
@@ -1361,30 +1744,16 @@ Status VariantColumnWriterImpl::finalize() {
     auto olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
 
     DCHECK(ptr->is_finalized());
-
-    for (const auto& entry : 
variant_util::get_sorted_subcolumns(ptr->get_subcolumns())) {
-        if (entry->path.empty()) {
-            // already handled
-            continue;
-        }
-        // Not supported nested path to generate sub column info, currently
-        if (entry->path.has_nested_part()) {
-            continue;
-        }
-        TabletSchema::SubColumnInfo sub_column_info;
-        if 
(variant_util::generate_sub_column_info(*_opts.rowset_ctx->tablet_schema,
-                                                   _tablet_column->unique_id(),
-                                                   entry->path.get_path(), 
&sub_column_info)) {
-            _subcolumns_info.emplace(entry->path.get_path(), 
std::move(sub_column_info));
-        }
-    }
-
-    RETURN_IF_ERROR(ptr->convert_typed_path_to_storage_type(_subcolumns_info));
+    const bool has_extracted_columns = _has_extracted_variant_columns();
+    const VariantNonRootWritePlan non_root_write_plan =
+            build_variant_non_root_write_plan(*_tablet_column, *ptr, 
has_extracted_columns);
+    RETURN_IF_ERROR(prepare_non_root_write_plan_before_write(
+            non_root_write_plan, ptr, *_tablet_column, 
*_opts.rowset_ctx->tablet_schema,
+            &_subcolumns_info));
 
     // Root NG dedup is handled in _process_root_column() — see the
     // has_root_ng check there. We intentionally do NOT modify the in-memory
     // root data here because the legacy NestedGroup prepare path still needs 
it.
-    const bool has_extracted_columns = _has_extracted_variant_columns();
     NestedGroupsMap prebuilt_nested_groups;
     bool has_prebuilt_nested_groups = false;
     _nested_group_routing_plan = NestedGroupRoutingPlan {};
@@ -1398,10 +1767,8 @@ Status VariantColumnWriterImpl::finalize() {
         has_prebuilt_nested_groups = true;
     }
 
-    if (variant_util::should_write_variant_binary_columns(*_tablet_column)) {
-        RETURN_IF_ERROR(ptr->pick_subcolumns_to_sparse_column(
-                _subcolumns_info, 
_tablet_column->variant_enable_typed_paths_to_sparse()));
-    }
+    
RETURN_IF_ERROR(prepare_sparse_columns_from_parse_tree(non_root_write_plan, ptr,
+                                                           *_tablet_column, 
_subcolumns_info));
 
 #ifndef NDEBUG
     ptr->check_consistency();
@@ -1413,19 +1780,29 @@ Status VariantColumnWriterImpl::finalize() {
     // convert root column data from engine format to storage layer format
     RETURN_IF_ERROR(_process_root_column(ptr, olap_data_convertor.get(), 
num_rows, column_id));
 
-    if (!has_extracted_columns) {
-        if (!_tablet_column->variant_enable_doc_mode()) {
-            // process and append each subcolumns to sub columns writers buffer
-            RETURN_IF_ERROR(
-                    _process_subcolumns(ptr, olap_data_convertor.get(), 
num_rows, column_id));
-        }
-
-        if 
(variant_util::should_write_variant_binary_columns(*_tablet_column)) {
-            // process sparse/doc column and append to binary writer buffer
-            RETURN_IF_ERROR(
-                    _process_binary_column(ptr, olap_data_convertor.get(), 
num_rows, column_id));
-        }
-    }
+    RETURN_IF_ERROR(std::visit(
+            Overloaded {[](const ExtractedColumnsOwnDataPlan&) { return 
Status::OK(); },
+                        [this, ptr, &olap_data_convertor, num_rows,
+                         &column_id](const DocValueStagingWritePlan&) {
+                            return _process_regular_doc_value_staging(
+                                    ptr, olap_data_convertor.get(), num_rows, 
column_id);
+                        },
+                        [this, ptr, &olap_data_convertor, num_rows,
+                         &column_id](const PersistentDocValueWritePlan&) {
+                            return _process_binary_column(ptr, 
olap_data_convertor.get(), num_rows,
+                                                          column_id);
+                        },
+                        [this, ptr, &olap_data_convertor, num_rows,
+                         &column_id](const ParseTimeSubcolumnsWritePlan& plan) 
{
+                            RETURN_IF_ERROR(_process_subcolumns(ptr, 
olap_data_convertor.get(),
+                                                                num_rows, 
column_id));
+                            if (plan.write_sparse_columns) {
+                                return _process_binary_column(ptr, 
olap_data_convertor.get(),
+                                                              num_rows, 
column_id);
+                            }
+                            return Status::OK();
+                        }},
+            non_root_write_plan));
 
     // Legacy non-streaming NestedGroup write behavior stays behind 
provider->prepare().
     if (_tablet_column->variant_enable_nested_group()) {
@@ -1892,7 +2269,6 @@ Status VariantDocCompactWriter::finalize() {
                         materialized_column_id, entry.rowids));
                 DCHECK_EQ(_subcolumn_writers.size(), prev_writer_count + 1);
                 
RETURN_IF_ERROR(finish_and_write_column_writer(_subcolumn_writers.back().get()));
-                release_processed_subcolumn_write_entry(&entry);
                 return Status::OK();
             },
             [this, &parent_column, variant_column, &converter, num_rows](int 
doc_value_column_id) {
diff --git a/be/src/storage/segment/variant/variant_column_writer_impl.h 
b/be/src/storage/segment/variant/variant_column_writer_impl.h
index b4100388995..5fc41b54b23 100644
--- a/be/src/storage/segment/variant/variant_column_writer_impl.h
+++ b/be/src/storage/segment/variant/variant_column_writer_impl.h
@@ -194,8 +194,14 @@ private:
     bool _has_extracted_variant_columns() const;
     Status _process_root_column(ColumnVariant* ptr, OlapBlockDataConvertor* 
converter,
                                 size_t num_rows, int& column_id);
+    // Write parse-time subcolumns. This remains the path for nested group, 
legacy flatten nested,
+    // and ordinary VARIANT writes that do not use temporary doc-value staging.
     Status _process_subcolumns(ColumnVariant* ptr, OlapBlockDataConvertor* 
converter,
                                size_t num_rows, int& column_id);
+    // Write plain non-doc VARIANT temporary doc-value staging: selected paths 
become materialized
+    // subcolumns and the remaining paths are emitted to sparse payload 
columns.
+    Status _process_regular_doc_value_staging(ColumnVariant* ptr, 
OlapBlockDataConvertor* converter,
+                                              size_t num_rows, int& column_id);
     Status _process_doc_value_column(ColumnVariant* ptr, 
OlapBlockDataConvertor* converter,
                                      size_t num_rows, int& column_id);
 
diff --git a/be/test/exec/common/schema_util_test.cpp 
b/be/test/exec/common/schema_util_test.cpp
index 8d0da1882c2..e084e546a8d 100644
--- a/be/test/exec/common/schema_util_test.cpp
+++ b/be/test/exec/common/schema_util_test.cpp
@@ -1946,8 +1946,8 @@ TEST_F(SchemaUtilTest, 
parse_and_materialize_variant_columns_ambiguous_paths) {
 
     // Prepare the variant column with the string column as root
     ColumnVariant::Subcolumns dynamic_subcolumns;
-    dynamic_subcolumns.create_root(
-            ColumnVariant::Subcolumn(string_col->assume_mutable(), 
string_type, true));
+    
dynamic_subcolumns.create_root(ColumnVariant::Subcolumn(string_col->assume_mutable(),
+                                                            string_type, true, 
true /*root*/));
 
     auto variant_col = ColumnVariant::create(0, false, 
std::move(dynamic_subcolumns));
     auto variant_type = std::make_shared<DataTypeVariant>();
diff --git a/be/test/storage/segment/variant_column_writer_reader_test.cpp 
b/be/test/storage/segment/variant_column_writer_reader_test.cpp
index 3a644cc373f..6db2dd3bfd0 100644
--- a/be/test/storage/segment/variant_column_writer_reader_test.cpp
+++ b/be/test/storage/segment/variant_column_writer_reader_test.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <atomic>
+#include <cstdlib>
 #include <set>
 #include <thread>
 
@@ -24,6 +25,8 @@
 #include "core/data_type/data_type_string.h"
 #include "core/data_type_serde/data_type_serde.h"
 #include "gtest/gtest.h"
+#include "storage/index/index_file_writer.h"
+#include "storage/index/inverted/inverted_index_desc.h"
 #include "storage/rowset/rowset_factory.h"
 #include "storage/segment/column_meta_accessor.h"
 #include "storage/segment/column_reader.h"
@@ -82,6 +85,30 @@ static void construct_tablet_index(TabletIndexPB* 
tablet_index, int64_t index_id
     tablet_index->add_col_unique_id(col_unique_id);
 }
 
+struct VariantStorageParseWriteResult {
+    size_t num_rows = 0;
+    size_t parsed_subcolumns = 0;
+    size_t parsed_allocated_bytes = 0;
+    size_t doc_value_entries = 0;
+    int footer_columns = 0;
+    int materialized_columns = 0;
+    int sparse_columns = 0;
+    int doc_value_columns = 0;
+    uint64_t segment_file_size = 0;
+};
+
+class ScopedVariantStorageParseMode {
+public:
+    explicit ScopedVariantStorageParseMode(int32_t value)
+            : _old_value(config::variant_storage_parse_mode) {
+        config::variant_storage_parse_mode = value;
+    }
+    ~ScopedVariantStorageParseMode() { config::variant_storage_parse_mode = 
_old_value; }
+
+private:
+    int32_t _old_value;
+};
+
 // MockColumnReaderCache class for testing
 class MockColumnReaderCache : public segment_v2::ColumnReaderCache {
 public:
@@ -208,6 +235,15 @@ protected:
         
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
     }
 
+    void init_tablet_from_current_schema(int64_t tablet_id) {
+        TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
+        tablet_meta->_tablet_id = tablet_id;
+        _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, 
_data_dir.get());
+        ASSERT_TRUE(_tablet->init().ok());
+        
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+        
ASSERT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
+    }
+
     RowsetSharedPtr create_variant_rowset(const 
std::vector<std::vector<std::string>>& batches,
                                           int64_t version, int64_t 
max_rows_per_segment = 200) {
         RowsetWriterContext ctx;
@@ -337,6 +373,189 @@ protected:
         return Status::OK();
     }
 
+    Status read_variant_path_rows(const SegmentFooterPB& footer, const 
std::string& file_path,
+                                  std::string_view relative_path, FieldType 
field_type,
+                                  std::vector<std::string>* out_rows) {
+        io::FileReaderSPtr file_reader;
+        RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_path, 
&file_reader));
+
+        std::shared_ptr<ColumnReader> column_reader;
+        RETURN_IF_ERROR(
+                create_variant_root_reader(footer, file_reader, 
_tablet_schema, &column_reader));
+
+        auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+        MockColumnReaderCache column_reader_cache(footer, file_reader, 
_tablet_schema);
+
+        const TabletColumn& parent_column = _tablet_schema->column(0);
+        const std::string full_path =
+                parent_column.name_lower_case() + "." + 
std::string(relative_path);
+        TabletColumn path_column;
+        path_column.set_name(full_path);
+        path_column.set_type(field_type);
+        path_column.set_parent_unique_id(parent_column.unique_id());
+        path_column.set_path_info(PathInData(full_path));
+        path_column.set_is_nullable(true);
+
+        StorageReadOptions storage_read_opts;
+        storage_read_opts.io_ctx.reader_type = ReaderType::READER_QUERY;
+        OlapReaderStatistics stats;
+        storage_read_opts.stats = &stats;
+
+        ColumnIteratorUPtr iterator;
+        RETURN_IF_ERROR(variant_column_reader->new_iterator(
+                &iterator, &path_column, &storage_read_opts, 
&column_reader_cache));
+
+        ColumnIteratorOptions column_iter_opts;
+        column_iter_opts.stats = &stats;
+        column_iter_opts.file_reader = file_reader.get();
+        RETURN_IF_ERROR(iterator->init(column_iter_opts));
+
+        auto data_type = 
DataTypeFactory::instance().create_data_type(path_column, false);
+        MutableColumnPtr dst = data_type->create_column();
+        size_t nrows = footer.num_rows();
+        RETURN_IF_ERROR(iterator->seek_to_ordinal(0));
+        RETURN_IF_ERROR(iterator->next_batch(&nrows, dst));
+
+        out_rows->clear();
+        out_rows->reserve(nrows);
+        for (size_t i = 0; i < nrows; ++i) {
+            out_rows->push_back(data_type->to_string(*dst, i));
+        }
+        return Status::OK();
+    }
+
+    void collect_variant_footer_stats(const SegmentFooterPB& footer, uint64_t 
file_size,
+                                      VariantStorageParseWriteResult* result) {
+        CHECK(result != nullptr);
+        result->footer_columns = footer.columns_size();
+        result->materialized_columns = 0;
+        result->sparse_columns = 0;
+        result->doc_value_columns = 0;
+        result->segment_file_size = file_size;
+        for (int i = 1; i < footer.columns_size(); ++i) {
+            const auto& meta = footer.columns(i);
+            if (!meta.has_column_path_info()) {
+                continue;
+            }
+            PathInData path;
+            path.from_protobuf(meta.column_path_info());
+            const auto base_path = path.copy_pop_front().get_path();
+            if (base_path == "__DORIS_VARIANT_SPARSE__" ||
+                base_path.rfind("__DORIS_VARIANT_SPARSE__.b", 0) == 0) {
+                ++result->sparse_columns;
+            } else if (base_path == "__DORIS_VARIANT_DOC_VALUE__" ||
+                       base_path.rfind("__DORIS_VARIANT_DOC_VALUE__.b", 0) == 
0) {
+                ++result->doc_value_columns;
+            } else {
+                ++result->materialized_columns;
+            }
+        }
+    }
+
+    Status write_storage_parsed_segment(const std::vector<std::string>& jsons,
+                                        std::string_view rowset_id, 
SegmentFooterPB* footer,
+                                        std::string* file_path, bool 
write_inverted_index = false,
+                                        VariantStorageParseWriteResult* result 
= nullptr) {
+        if (footer == nullptr || file_path == nullptr) {
+            return Status::InvalidArgument("footer or file_path is null");
+        }
+        const size_t num_rows = jsons.size();
+        *file_path = local_segment_path(_tablet->tablet_path(), rowset_id, 0);
+        
static_cast<void>(io::global_local_filesystem()->delete_file(*file_path));
+
+        io::FileWriterPtr file_writer;
+        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(*file_path, 
&file_writer));
+
+        std::unique_ptr<segment_v2::IndexFileWriter> index_file_writer;
+        if (_tablet_schema->has_inverted_index()) {
+            const std::string index_path_prefix = std::string(
+                    
segment_v2::InvertedIndexDescriptor::get_index_file_path_prefix(*file_path));
+            io::FileWriterPtr index_v2_file_writer;
+            if (_tablet_schema->get_inverted_index_storage_format() !=
+                InvertedIndexStorageFormatPB::V1) {
+                RETURN_IF_ERROR(io::global_local_filesystem()->create_file(
+                        
segment_v2::InvertedIndexDescriptor::get_index_file_path_v2(
+                                index_path_prefix),
+                        &index_v2_file_writer));
+            }
+            index_file_writer = std::make_unique<segment_v2::IndexFileWriter>(
+                    io::global_local_filesystem(), index_path_prefix, 
std::string(rowset_id),
+                    0 /* seg_id */, 
_tablet_schema->get_inverted_index_storage_format(),
+                    std::move(index_v2_file_writer), true /* can_use_ram_dir 
*/);
+        }
+
+        footer->Clear();
+        RowsetWriterContext rowset_ctx;
+        rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
+        rowset_ctx.tablet_schema = _tablet_schema;
+        rowset_ctx.tablet = _tablet;
+        rowset_ctx.tablet_path = _tablet->tablet_path();
+
+        TabletColumn parent_column = _tablet_schema->column(0);
+        ColumnWriterOptions opts;
+        opts.meta = footer->add_columns();
+        opts.index_file_writer = index_file_writer.get();
+        opts.compression_type = CompressionTypePB::LZ4;
+        opts.file_writer = file_writer.get();
+        opts.footer = footer;
+        opts.rowset_ctx = &rowset_ctx;
+        _init_column_meta(opts.meta, 0, parent_column, CompressionTypePB::LZ4);
+
+        std::unique_ptr<ColumnWriter> writer;
+        RETURN_IF_ERROR(ColumnWriter::create(opts, &parent_column, 
file_writer.get(), &writer));
+        RETURN_IF_ERROR(writer->init());
+
+        Block block = _tablet_schema->create_block();
+        auto columns = std::move(block).mutate_columns();
+        auto scalar_variant = ColumnVariant::create(0, 
parent_column.variant_enable_doc_mode());
+        for (const auto& json : jsons) {
+            VariantUtil::insert_root_scalar_field(*scalar_variant,
+                                                  
Field::create_field<TYPE_STRING>(String(json)));
+        }
+        columns[0] = std::move(scalar_variant);
+        block.set_columns(std::move(columns));
+
+        RETURN_IF_ERROR(
+                variant_util::parse_and_materialize_variant_columns(block, 
*_tablet_schema, {0}));
+
+        const auto& parsed_variant =
+                assert_cast<const 
ColumnVariant&>(*block.get_by_position(0).column);
+        if (result != nullptr) {
+            result->num_rows = num_rows;
+            result->parsed_subcolumns = parsed_variant.get_subcolumns().size();
+            result->parsed_allocated_bytes = parsed_variant.allocated_bytes();
+            const auto& doc_value_offsets = 
parsed_variant.serialized_doc_value_column_offsets();
+            result->doc_value_entries = doc_value_offsets.empty() ? 0 : 
doc_value_offsets.back();
+        }
+
+        auto converter = std::make_unique<OlapBlockDataConvertor>();
+        converter->add_column_data_convertor(parent_column);
+        converter->set_source_content(&block, 0, num_rows);
+        auto [convert_status, accessor] = converter->convert_column_data(0);
+        RETURN_IF_ERROR(convert_status);
+        RETURN_IF_ERROR(writer->append(accessor->get_nullmap(), 
accessor->get_data(), num_rows));
+
+        RETURN_IF_ERROR(writer->finish());
+        RETURN_IF_ERROR(writer->write_data());
+        RETURN_IF_ERROR(writer->write_ordinal_index());
+        RETURN_IF_ERROR(writer->write_zone_map());
+        if (write_inverted_index) {
+            RETURN_IF_ERROR(writer->write_inverted_index());
+            if (index_file_writer != nullptr) {
+                RETURN_IF_ERROR(index_file_writer->begin_close());
+                RETURN_IF_ERROR(index_file_writer->finish_close());
+            }
+        }
+        RETURN_IF_ERROR(file_writer->close());
+        footer->set_num_rows(num_rows);
+        if (result != nullptr) {
+            int64_t file_size = 0;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->file_size(*file_path, 
&file_size));
+            collect_variant_footer_stats(*footer, 
cast_set<uint64_t>(file_size), result);
+        }
+        return Status::OK();
+    }
+
     TabletSchemaSPtr _tablet_schema = nullptr;
     StorageEngine* _engine_ref = nullptr;
     std::unique_ptr<DataDir> _data_dir = nullptr;
@@ -366,6 +585,36 @@ void check_sparse_column_meta(const ColumnMetaPB& 
column_meta, auto& path_with_s
                 base_path.rfind("__DORIS_VARIANT_SPARSE__.b", 0) == 0);
 }
 
+static const ColumnMetaPB* find_footer_column_meta_by_relative_path(
+        const SegmentFooterPB& footer, std::string_view relative_path) {
+    for (int i = 0; i < footer.columns_size(); ++i) {
+        const auto& column_meta = footer.columns(i);
+        if (!column_meta.has_column_path_info()) {
+            continue;
+        }
+        PathInData path;
+        path.from_protobuf(column_meta.column_path_info());
+        if (path.copy_pop_front().get_path() == relative_path) {
+            return &column_meta;
+        }
+    }
+    return nullptr;
+}
+
+static TabletColumn make_int_typed_path_template(
+        std::string_view path, PatternTypePB pattern_type = 
PatternTypePB::MATCH_NAME) {
+    ColumnPB column_pb;
+    column_pb.set_unique_id(-1);
+    column_pb.set_name(std::string(path));
+    column_pb.set_type("INT");
+    column_pb.set_is_nullable(true);
+    column_pb.set_pattern_type(pattern_type);
+
+    TabletColumn column;
+    column.init_from_pb(column_pb);
+    return column;
+}
+
 static void fill_variant_column_with_doc_value_only(
         MutableColumnPtr& column_object, int num_rows,
         std::unordered_map<int, std::string>* inserted) {
@@ -455,6 +704,44 @@ static std::vector<std::string> normalize_json_rows(const 
std::vector<std::strin
     return normalized;
 }
 
+static void append_variant_json_field(std::string* json, bool* first, 
std::string_view key,
+                                      int64_t value) {
+    if (!*first) {
+        json->push_back(',');
+    }
+    *first = false;
+    json->push_back('"');
+    json->append(key.data(), key.size());
+    json->append("\":");
+    json->append(std::to_string(value));
+}
+
+static std::vector<std::string> make_variant_write_footprint_jsons(size_t 
num_rows,
+                                                                   size_t 
dense_key_count,
+                                                                   size_t 
sparse_keys_per_row,
+                                                                   size_t 
sparse_key_pool) {
+    std::vector<std::string> jsons;
+    jsons.reserve(num_rows);
+    for (size_t row = 0; row < num_rows; ++row) {
+        std::string json;
+        json.reserve((dense_key_count + sparse_keys_per_row) * 18);
+        json.push_back('{');
+        bool first = true;
+        for (size_t i = 0; i < dense_key_count; ++i) {
+            append_variant_json_field(&json, &first, "hot" + std::to_string(i),
+                                      static_cast<int64_t>(row + i));
+        }
+        for (size_t i = 0; i < sparse_keys_per_row; ++i) {
+            const size_t key_id = (row * sparse_keys_per_row + i) % 
sparse_key_pool;
+            append_variant_json_field(&json, &first, "cold" + 
std::to_string(key_id),
+                                      static_cast<int64_t>(row + key_id));
+        }
+        json.push_back('}');
+        jsons.push_back(std::move(json));
+    }
+    return jsons;
+}
+
 // Regression test for legacy flat-dot-key compatibility.
 //
 // Old versions (e.g. cloud-4.1.2 with variant_max_subcolumns_count=0) stored
@@ -2030,6 +2317,662 @@ TEST_F(VariantColumnWriterReaderTest, 
test_write_doc_sparse_write_array_gap_and_
     
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
 }
 
+TEST_F(VariantColumnWriterReaderTest, 
test_storage_parse_kv_write_materialized_and_sparse) {
+    constexpr int kRows = 4;
+
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    construct_column(schema_pb.add_column(), 1, "VARIANT", "V1",
+                     /*variant_max_subcolumns_count=*/2,
+                     /*is_key=*/false,
+                     /*is_nullable=*/false,
+                     /*variant_sparse_hash_shard_count=*/0,
+                     /*variant_enable_doc_mode=*/false);
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
+    tablet_meta->_tablet_id = 33003;
+    _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, 
_data_dir.get());
+    ASSERT_TRUE(_tablet->init().ok());
+    
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+    
ASSERT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
+
+    io::FileWriterPtr file_writer;
+    auto file_path = local_segment_path(_tablet->tablet_path(), "0", 0);
+    auto st = io::global_local_filesystem()->create_file(file_path, 
&file_writer);
+    ASSERT_TRUE(st.ok()) << st.msg();
+
+    SegmentFooterPB footer;
+    RowsetWriterContext rowset_ctx;
+    rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
+    rowset_ctx.tablet_schema = _tablet_schema;
+
+    TabletColumn parent_column = _tablet_schema->column(0);
+    ColumnWriterOptions opts;
+    opts.meta = footer.add_columns();
+    opts.compression_type = CompressionTypePB::LZ4;
+    opts.file_writer = file_writer.get();
+    opts.footer = &footer;
+    opts.rowset_ctx = &rowset_ctx;
+    _init_column_meta(opts.meta, 0, parent_column, CompressionTypePB::LZ4);
+
+    std::unique_ptr<ColumnWriter> writer;
+    ASSERT_TRUE(ColumnWriter::create(opts, &parent_column, file_writer.get(), 
&writer).ok());
+    ASSERT_TRUE(writer->init().ok());
+
+    const std::vector<std::string> jsons = {
+            R"({"hot":1,"warm":10,"cold0":100})",
+            R"({"hot":2,"warm":20,"cold1":101})",
+            R"({"hot":3,"warm":30,"cold2":102})",
+            R"({"hot":4,"warm":40,"cold3":103})",
+    };
+
+    Block block = _tablet_schema->create_block();
+    auto columns = std::move(block).mutate_columns();
+    auto scalar_variant = ColumnVariant::create(0, false);
+    for (const auto& json : jsons) {
+        VariantUtil::insert_root_scalar_field(*scalar_variant,
+                                              
Field::create_field<TYPE_STRING>(String(json)));
+    }
+    columns[0] = std::move(scalar_variant);
+    block.set_columns(std::move(columns));
+
+    st = variant_util::parse_and_materialize_variant_columns(block, 
*_tablet_schema, {0});
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    const auto& parsed_variant =
+            assert_cast<const 
ColumnVariant&>(*block.get_by_position(0).column);
+    EXPECT_EQ(parsed_variant.get_subcolumn(PathInData("hot")), nullptr);
+    EXPECT_EQ(parsed_variant.get_subcolumn(PathInData("warm")), nullptr);
+    ASSERT_FALSE(parsed_variant.serialized_doc_value_column_offsets().empty());
+    EXPECT_EQ(parsed_variant.serialized_doc_value_column_offsets().back(), 
kRows * 3);
+
+    auto converter = std::make_unique<OlapBlockDataConvertor>();
+    converter->add_column_data_convertor(parent_column);
+    converter->set_source_content(&block, 0, kRows);
+    auto [convert_status, accessor] = converter->convert_column_data(0);
+    ASSERT_TRUE(convert_status.ok()) << convert_status.to_string();
+    ASSERT_NE(accessor, nullptr);
+    ASSERT_TRUE(writer->append(accessor->get_nullmap(), accessor->get_data(), 
kRows).ok());
+
+    ASSERT_TRUE(writer->finish().ok());
+    ASSERT_TRUE(writer->write_data().ok());
+    ASSERT_TRUE(writer->write_ordinal_index().ok());
+    ASSERT_TRUE(writer->write_zone_map().ok());
+    ASSERT_TRUE(file_writer->close().ok());
+    footer.set_num_rows(kRows);
+
+    EXPECT_EQ(footer.columns_size(), 4);
+
+    io::FileReaderSPtr file_reader;
+    st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    std::shared_ptr<ColumnReader> column_reader;
+    st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+    ASSERT_NE(variant_column_reader, nullptr);
+
+    
EXPECT_NE(variant_column_reader->get_subcolumn_meta_by_path(PathInData("hot")), 
nullptr);
+    
EXPECT_NE(variant_column_reader->get_subcolumn_meta_by_path(PathInData("warm")),
 nullptr);
+    for (int i = 0; i < kRows; ++i) {
+        EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(
+                PathInData("cold" + std::to_string(i))));
+    }
+
+    const auto* stats = variant_column_reader->get_stats();
+    ASSERT_NE(stats, nullptr);
+    EXPECT_EQ(stats->subcolumns_non_null_size.at("hot"), kRows);
+    EXPECT_EQ(stats->subcolumns_non_null_size.at("warm"), kRows);
+    for (int i = 0; i < kRows; ++i) {
+        EXPECT_EQ(stats->sparse_column_non_null_size.at("cold" + 
std::to_string(i)), 1);
+    }
+
+    std::vector<std::string> actual_rows;
+    st = read_root_rows(footer, file_path, &actual_rows);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    EXPECT_EQ(actual_rows,
+              normalize_json_rows(jsons, 
parent_column.variant_max_subcolumns_count()));
+
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+}
+
+TEST_F(VariantColumnWriterReaderTest,
+       test_storage_parse_kv_write_typed_path_materialized_with_storage_type) {
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    construct_column(schema_pb.add_column(), 1, "VARIANT", "V1",
+                     /*variant_max_subcolumns_count=*/1,
+                     /*is_key=*/false,
+                     /*is_nullable=*/false,
+                     /*variant_sparse_hash_shard_count=*/0,
+                     /*variant_enable_doc_mode=*/false);
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    auto typed_path = make_int_typed_path_template("typed_i");
+    _tablet_schema->mutable_column_by_uid(1).add_sub_column(typed_path);
+    init_tablet_from_current_schema(33007);
+
+    const std::vector<std::string> jsons = {
+            R"({"typed_i":1,"hot":"a","cold0":100})",
+            R"({"typed_i":2,"hot":"b","cold1":101})",
+            R"({"hot":"c","cold2":102})",
+    };
+
+    SegmentFooterPB footer;
+    std::string file_path;
+    auto st = write_storage_parsed_segment(jsons, "typed_materialized", 
&footer, &file_path);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto* typed_meta = find_footer_column_meta_by_relative_path(footer, 
"typed_i");
+    ASSERT_NE(typed_meta, nullptr);
+    EXPECT_EQ(typed_meta->type(), 
static_cast<int>(FieldType::OLAP_FIELD_TYPE_INT));
+    EXPECT_TRUE(typed_meta->is_nullable());
+    EXPECT_FALSE(typed_meta->has_none_null_size());
+
+    const auto* hot_meta = find_footer_column_meta_by_relative_path(footer, 
"hot");
+    ASSERT_NE(hot_meta, nullptr);
+    EXPECT_EQ(hot_meta->none_null_size(), jsons.size());
+
+    io::FileReaderSPtr file_reader;
+    st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    std::shared_ptr<ColumnReader> column_reader;
+    st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+    ASSERT_NE(variant_column_reader, nullptr);
+    
EXPECT_NE(variant_column_reader->get_subcolumn_meta_by_path(PathInData("typed_i")),
 nullptr);
+    
EXPECT_NE(variant_column_reader->get_subcolumn_meta_by_path(PathInData("hot")), 
nullptr);
+    
EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(PathInData("cold0")));
+    
EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(PathInData("cold1")));
+    
EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(PathInData("cold2")));
+    const auto* stats = variant_column_reader->get_stats();
+    ASSERT_NE(stats, nullptr);
+    EXPECT_FALSE(stats->subcolumns_non_null_size.contains("typed_i"));
+    EXPECT_EQ(stats->subcolumns_non_null_size.at("hot"), jsons.size());
+
+    std::vector<std::string> actual_rows;
+    st = read_root_rows(footer, file_path, &actual_rows);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    EXPECT_EQ(actual_rows,
+              normalize_json_rows(jsons, 
_tablet_schema->column(0).variant_max_subcolumns_count()));
+
+    std::vector<std::string> typed_values;
+    st = read_variant_path_rows(footer, file_path, "typed_i", 
FieldType::OLAP_FIELD_TYPE_INT,
+                                &typed_values);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    EXPECT_EQ(typed_values, (std::vector<std::string> {"1", "2", "NULL"}));
+
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+}
+
+TEST_F(VariantColumnWriterReaderTest, 
test_storage_parse_kv_write_typed_path_sparse_fallback) {
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    auto* root_pb = schema_pb.add_column();
+    construct_column(root_pb, 1, "VARIANT", "V1",
+                     /*variant_max_subcolumns_count=*/1,
+                     /*is_key=*/false,
+                     /*is_nullable=*/false,
+                     /*variant_sparse_hash_shard_count=*/0,
+                     /*variant_enable_doc_mode=*/false);
+    root_pb->set_variant_enable_typed_paths_to_sparse(true);
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    auto typed_path = make_int_typed_path_template("typed_i");
+    _tablet_schema->mutable_column_by_uid(1).add_sub_column(typed_path);
+    init_tablet_from_current_schema(33008);
+
+    const std::vector<std::string> jsons = {
+            R"({"typed_i":1,"hot":"a"})",
+            R"({"typed_i":2,"hot":"b"})",
+            R"({"hot":"c"})",
+    };
+
+    SegmentFooterPB footer;
+    std::string file_path;
+    auto st = write_storage_parsed_segment(jsons, "typed_sparse", &footer, 
&file_path);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    EXPECT_EQ(find_footer_column_meta_by_relative_path(footer, "typed_i"), 
nullptr);
+    const auto* hot_meta = find_footer_column_meta_by_relative_path(footer, 
"hot");
+    ASSERT_NE(hot_meta, nullptr);
+    EXPECT_EQ(hot_meta->none_null_size(), jsons.size());
+
+    io::FileReaderSPtr file_reader;
+    st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    std::shared_ptr<ColumnReader> column_reader;
+    st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+    ASSERT_NE(variant_column_reader, nullptr);
+    
EXPECT_EQ(variant_column_reader->get_subcolumn_meta_by_path(PathInData("typed_i")),
 nullptr);
+    
EXPECT_NE(variant_column_reader->get_subcolumn_meta_by_path(PathInData("hot")), 
nullptr);
+    
EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(PathInData("typed_i")));
+
+    const auto* stats = variant_column_reader->get_stats();
+    ASSERT_NE(stats, nullptr);
+    EXPECT_EQ(stats->subcolumns_non_null_size.at("hot"), jsons.size());
+    EXPECT_EQ(stats->sparse_column_non_null_size.at("typed_i"), 2);
+
+    std::vector<std::string> actual_rows;
+    st = read_root_rows(footer, file_path, &actual_rows);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    EXPECT_EQ(actual_rows,
+              normalize_json_rows(jsons, 
_tablet_schema->column(0).variant_max_subcolumns_count()));
+
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+}
+
+TEST_F(VariantColumnWriterReaderTest,
+       
test_storage_parse_kv_write_glob_typed_path_materialized_with_storage_type) {
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    construct_column(schema_pb.add_column(), 1, "VARIANT", "V1",
+                     /*variant_max_subcolumns_count=*/1,
+                     /*is_key=*/false,
+                     /*is_nullable=*/false,
+                     /*variant_sparse_hash_shard_count=*/0,
+                     /*variant_enable_doc_mode=*/false);
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    auto typed_path = make_int_typed_path_template("typed_*", 
PatternTypePB::MATCH_NAME_GLOB);
+    _tablet_schema->mutable_column_by_uid(1).add_sub_column(typed_path);
+    init_tablet_from_current_schema(33010);
+
+    const std::vector<std::string> jsons = {
+            R"({"typed_g":1,"hot":"a","cold0":100})",
+            R"({"typed_g":2,"hot":"b","cold1":101})",
+            R"({"hot":"c","cold2":102})",
+    };
+
+    SegmentFooterPB footer;
+    std::string file_path;
+    auto st = write_storage_parsed_segment(jsons, "glob_typed_materialized", 
&footer, &file_path);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto* typed_meta = find_footer_column_meta_by_relative_path(footer, 
"typed_g");
+    ASSERT_NE(typed_meta, nullptr);
+    EXPECT_EQ(typed_meta->type(), 
static_cast<int>(FieldType::OLAP_FIELD_TYPE_INT));
+    EXPECT_TRUE(typed_meta->is_nullable());
+    EXPECT_FALSE(typed_meta->has_none_null_size());
+
+    const auto* hot_meta = find_footer_column_meta_by_relative_path(footer, 
"hot");
+    ASSERT_NE(hot_meta, nullptr);
+    EXPECT_EQ(hot_meta->none_null_size(), jsons.size());
+
+    std::vector<std::string> actual_rows;
+    st = read_root_rows(footer, file_path, &actual_rows);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    EXPECT_EQ(actual_rows,
+              normalize_json_rows(jsons, 
_tablet_schema->column(0).variant_max_subcolumns_count()));
+
+    io::FileReaderSPtr file_reader;
+    st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    std::shared_ptr<ColumnReader> column_reader;
+    st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+    ASSERT_NE(variant_column_reader, nullptr);
+    const auto* stats = variant_column_reader->get_stats();
+    ASSERT_NE(stats, nullptr);
+    EXPECT_FALSE(stats->subcolumns_non_null_size.contains("typed_g"));
+    EXPECT_EQ(stats->subcolumns_non_null_size.at("hot"), jsons.size());
+
+    std::vector<std::string> typed_values;
+    st = read_variant_path_rows(footer, file_path, "typed_g", 
FieldType::OLAP_FIELD_TYPE_INT,
+                                &typed_values);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    EXPECT_EQ(typed_values, (std::vector<std::string> {"1", "2", "NULL"}));
+
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+}
+
+TEST_F(VariantColumnWriterReaderTest, 
test_storage_parse_kv_write_glob_typed_path_sparse_fallback) {
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    auto* root_pb = schema_pb.add_column();
+    construct_column(root_pb, 1, "VARIANT", "V1",
+                     /*variant_max_subcolumns_count=*/1,
+                     /*is_key=*/false,
+                     /*is_nullable=*/false,
+                     /*variant_sparse_hash_shard_count=*/0,
+                     /*variant_enable_doc_mode=*/false);
+    root_pb->set_variant_enable_typed_paths_to_sparse(true);
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    auto typed_path = make_int_typed_path_template("typed_*", 
PatternTypePB::MATCH_NAME_GLOB);
+    _tablet_schema->mutable_column_by_uid(1).add_sub_column(typed_path);
+    init_tablet_from_current_schema(33011);
+
+    const std::vector<std::string> jsons = {
+            R"({"typed_g":1,"hot":"a"})",
+            R"({"typed_g":2,"hot":"b"})",
+            R"({"hot":"c"})",
+    };
+
+    SegmentFooterPB footer;
+    std::string file_path;
+    auto st = write_storage_parsed_segment(jsons, "glob_typed_sparse", 
&footer, &file_path);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    EXPECT_EQ(find_footer_column_meta_by_relative_path(footer, "typed_g"), 
nullptr);
+    const auto* hot_meta = find_footer_column_meta_by_relative_path(footer, 
"hot");
+    ASSERT_NE(hot_meta, nullptr);
+    EXPECT_EQ(hot_meta->none_null_size(), jsons.size());
+
+    io::FileReaderSPtr file_reader;
+    st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    std::shared_ptr<ColumnReader> column_reader;
+    st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+    ASSERT_NE(variant_column_reader, nullptr);
+    
EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(PathInData("typed_g")));
+
+    const auto* stats = variant_column_reader->get_stats();
+    ASSERT_NE(stats, nullptr);
+    EXPECT_EQ(stats->subcolumns_non_null_size.at("hot"), jsons.size());
+    EXPECT_EQ(stats->sparse_column_non_null_size.at("typed_g"), 2);
+
+    std::vector<std::string> actual_rows;
+    st = read_root_rows(footer, file_path, &actual_rows);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    EXPECT_EQ(actual_rows,
+              normalize_json_rows(jsons, 
_tablet_schema->column(0).variant_max_subcolumns_count()));
+
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+}
+
+TEST_F(VariantColumnWriterReaderTest,
+       test_storage_parse_kv_write_parent_index_topn_materialized_only) {
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
+    construct_column(schema_pb.add_column(), 1, "VARIANT", "V1",
+                     /*variant_max_subcolumns_count=*/1,
+                     /*is_key=*/false,
+                     /*is_nullable=*/false,
+                     /*variant_sparse_hash_shard_count=*/0,
+                     /*variant_enable_doc_mode=*/false);
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    TabletIndexPB parent_index_pb;
+    construct_tablet_index(&parent_index_pb, 10007, "idx_v1",
+                           _tablet_schema->column(0).unique_id());
+    TabletIndex parent_index;
+    parent_index.init_from_pb(parent_index_pb);
+    _tablet_schema->append_index(std::move(parent_index));
+    init_tablet_from_current_schema(33009);
+
+    const std::vector<std::string> jsons = {
+            R"({"hot":"a","cold0":"x"})",
+            R"({"hot":"b","cold1":"y"})",
+            R"({"hot":"c","cold2":"z"})",
+    };
+
+    SegmentFooterPB footer;
+    std::string file_path;
+    auto st = write_storage_parsed_segment(jsons, "parent_index", &footer, 
&file_path,
+                                           true /* write_inverted_index */);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto* hot_meta = find_footer_column_meta_by_relative_path(footer, 
"hot");
+    ASSERT_NE(hot_meta, nullptr);
+    EXPECT_EQ(hot_meta->none_null_size(), jsons.size());
+    EXPECT_EQ(find_footer_column_meta_by_relative_path(footer, "cold0"), 
nullptr);
+
+    io::FileReaderSPtr file_reader;
+    st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    std::shared_ptr<ColumnReader> column_reader;
+    st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
+    ASSERT_TRUE(st.ok()) << st.msg();
+    auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+    ASSERT_NE(variant_column_reader, nullptr);
+    
EXPECT_NE(variant_column_reader->get_subcolumn_meta_by_path(PathInData("hot")), 
nullptr);
+    
EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(PathInData("cold0")));
+    
EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(PathInData("cold1")));
+    
EXPECT_TRUE(variant_column_reader->exist_in_sparse_column(PathInData("cold2")));
+
+    TabletColumn hot_subcolumn;
+    hot_subcolumn.set_name("v1.hot");
+    hot_subcolumn.set_type(FieldType::OLAP_FIELD_TYPE_STRING);
+    hot_subcolumn.set_parent_unique_id(_tablet_schema->column(0).unique_id());
+    hot_subcolumn.set_path_info(PathInData("v1.hot"));
+    hot_subcolumn.set_is_nullable(true);
+    auto indexes = variant_column_reader->find_subcolumn_tablet_indexes(
+            hot_subcolumn, std::make_shared<DataTypeString>());
+    ASSERT_EQ(indexes.size(), 1);
+    EXPECT_EQ(indexes[0]->index_id(), 10007);
+    EXPECT_EQ(indexes[0]->get_index_suffix(), "v1%2Ehot");
+
+    std::vector<std::string> actual_rows;
+    st = read_root_rows(footer, file_path, &actual_rows);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    EXPECT_EQ(actual_rows,
+              normalize_json_rows(jsons, 
_tablet_schema->column(0).variant_max_subcolumns_count()));
+
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+}
+
+TEST_F(VariantColumnWriterReaderTest,
+       
test_compaction_schema_excludes_materialized_typed_paths_from_topn_sparse_paths)
 {
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
+    construct_column(schema_pb.add_column(), 1, "VARIANT", "V1",
+                     /*variant_max_subcolumns_count=*/1,
+                     /*is_key=*/false,
+                     /*is_nullable=*/false,
+                     /*variant_sparse_hash_shard_count=*/0,
+                     /*variant_enable_doc_mode=*/false);
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    auto typed_path = make_int_typed_path_template("a");
+    _tablet_schema->mutable_column_by_uid(1).add_sub_column(typed_path);
+
+    TabletIndexPB parent_index_pb;
+    construct_tablet_index(&parent_index_pb, 10008, "idx_v1",
+                           _tablet_schema->column(0).unique_id());
+    TabletIndex parent_index;
+    parent_index.init_from_pb(parent_index_pb);
+    _tablet_schema->append_index(std::move(parent_index));
+    init_tablet_from_current_schema(33012);
+
+    auto rowset = create_variant_rowset({{R"({"a":1,"b":"x"})", 
R"({"a":2,"b":"y","c":"z"})"}}, 1);
+    std::vector<RowsetSharedPtr> input_rowsets {rowset};
+
+    auto compaction_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+    auto st = 
variant_util::VariantCompactionUtil::get_extended_compaction_schema(
+            input_rowsets, compaction_schema);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto* path_set_info = compaction_schema->try_path_set_info(1);
+    ASSERT_NE(path_set_info, nullptr);
+    ASSERT_TRUE(path_set_info->typed_path_set.contains("a"));
+    EXPECT_FALSE(path_set_info->sub_path_set.contains(StringRef("a")));
+    EXPECT_FALSE(path_set_info->sparse_path_set.contains(StringRef("a")));
+    EXPECT_FALSE(path_set_info->subcolumn_indexes.contains("a"));
+    EXPECT_TRUE(path_set_info->sub_path_set.contains(StringRef("b")));
+    EXPECT_TRUE(path_set_info->sparse_path_set.contains(StringRef("c")));
+
+    size_t typed_path_count = 0;
+    size_t dynamic_path_count = 0;
+    size_t sparse_path_count = 0;
+    for (const auto& column : compaction_schema->columns()) {
+        if (!column->is_extracted_column() || column->parent_unique_id() != 1) 
{
+            continue;
+        }
+        const auto relative_path = 
column->path_info_ptr()->copy_pop_front().get_path();
+        if (relative_path == "a") {
+            ++typed_path_count;
+            EXPECT_TRUE(column->path_info_ptr()->get_is_typed());
+        } else if (relative_path == "b") {
+            ++dynamic_path_count;
+            EXPECT_FALSE(column->path_info_ptr()->get_is_typed());
+        } else if (relative_path == "c") {
+            ++sparse_path_count;
+        }
+    }
+    EXPECT_EQ(typed_path_count, 1);
+    EXPECT_EQ(dynamic_path_count, 1);
+    EXPECT_EQ(sparse_path_count, 0);
+
+    const auto& typed_info = path_set_info->typed_path_set.at("a");
+    ASSERT_EQ(typed_info.indexes.size(), 1);
+    EXPECT_EQ(typed_info.indexes[0]->index_id(), 10008);
+    EXPECT_EQ(typed_info.indexes[0]->get_index_suffix(), "v1%2Ea");
+}
+
+TEST_F(VariantColumnWriterReaderTest,
+       
test_doc_value_staging_root_writer_skips_payload_with_extracted_columns) {
+    constexpr int kRows = 2;
+
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    construct_column(schema_pb.add_column(), 1, "VARIANT", "V1",
+                     /*variant_max_subcolumns_count=*/2,
+                     /*is_key=*/false,
+                     /*is_nullable=*/false,
+                     /*variant_sparse_hash_shard_count=*/0,
+                     /*variant_enable_doc_mode=*/false);
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    TabletColumn parent_column = _tablet_schema->column(0);
+    TabletColumn extracted;
+    extracted.set_name(parent_column.name_lower_case() + ".hot");
+    extracted.set_type(FieldType::OLAP_FIELD_TYPE_BIGINT);
+    extracted.set_parent_unique_id(parent_column.unique_id());
+    extracted.set_path_info(PathInData(parent_column.name_lower_case() + 
".hot"));
+    extracted.set_is_nullable(true);
+    _tablet_schema->append_column(extracted);
+
+    TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
+    tablet_meta->_tablet_id = 33006;
+    _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, 
_data_dir.get());
+    ASSERT_TRUE(_tablet->init().ok());
+    
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+    
ASSERT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
+
+    io::FileWriterPtr file_writer;
+    auto file_path = local_segment_path(_tablet->tablet_path(), "0", 0);
+    auto st = io::global_local_filesystem()->create_file(file_path, 
&file_writer);
+    ASSERT_TRUE(st.ok()) << st.msg();
+
+    SegmentFooterPB footer;
+    RowsetWriterContext rowset_ctx;
+    rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
+    rowset_ctx.tablet_schema = _tablet_schema;
+
+    ColumnWriterOptions opts;
+    opts.meta = footer.add_columns();
+    opts.compression_type = CompressionTypePB::LZ4;
+    opts.file_writer = file_writer.get();
+    opts.footer = &footer;
+    opts.rowset_ctx = &rowset_ctx;
+    _init_column_meta(opts.meta, 0, parent_column, CompressionTypePB::LZ4);
+
+    std::unique_ptr<ColumnWriter> writer;
+    ASSERT_TRUE(ColumnWriter::create(opts, &parent_column, file_writer.get(), 
&writer).ok());
+    ASSERT_TRUE(writer->init().ok());
+
+    auto strings = ColumnString::create();
+    const std::vector<std::string> jsons = {R"({"hot":1,"cold":10})", 
R"({"hot":2,"cold":20})"};
+    for (const auto& json : jsons) {
+        strings->insert_data(json.data(), json.size());
+    }
+
+    ParseConfig parse_cfg;
+    parse_cfg.deprecated_enable_flatten_nested = false;
+    parse_cfg.parse_to = ParseConfig::ParseTo::OnlyDocValueColumn;
+    auto variant_column =
+            
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), false);
+    variant_util::parse_json_to_variant(*variant_column, *strings, parse_cfg);
+    
ASSERT_FALSE(variant_column->serialized_doc_value_column_offsets().empty());
+    ASSERT_EQ(variant_column->serialized_doc_value_column_offsets().back(), 
kRows * 2);
+
+    auto variant_data = std::make_unique<VariantColumnData>();
+    variant_data->column_data = variant_column.get();
+    variant_data->row_pos = 0;
+    const auto* data = reinterpret_cast<const uint8_t*>(variant_data.get());
+    ASSERT_TRUE(writer->append_data(&data, kRows).ok());
+
+    ASSERT_TRUE(writer->finish().ok());
+    ASSERT_TRUE(writer->write_data().ok());
+    ASSERT_TRUE(writer->write_ordinal_index().ok());
+    ASSERT_TRUE(writer->write_zone_map().ok());
+    ASSERT_TRUE(file_writer->close().ok());
+    footer.set_num_rows(kRows);
+
+    EXPECT_EQ(footer.columns_size(), 1);
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+}
+
+TEST_F(VariantColumnWriterReaderTest, 
test_storage_parse_kv_reduces_sparse_parse_write_footprint) {
+    constexpr size_t kRows = 2048;
+    constexpr size_t kDenseKeys = 2;
+    constexpr size_t kSparseKeysPerRow = 30;
+    constexpr size_t kSparseKeyPool = 1000;
+    constexpr size_t kPathsPerRow = kDenseKeys + kSparseKeysPerRow;
+
+    init_variant_tablet(33004, kDenseKeys);
+    const auto jsons = make_variant_write_footprint_jsons(kRows, kDenseKeys, 
kSparseKeysPerRow,
+                                                          kSparseKeyPool);
+
+    VariantStorageParseWriteResult parse_time_result;
+    {
+        ScopedVariantStorageParseMode guard(1);
+        SegmentFooterPB footer;
+        std::string file_path;
+        auto st = write_storage_parsed_segment(jsons, "force_subcolumns", 
&footer, &file_path,
+                                               false, &parse_time_result);
+        ASSERT_TRUE(st.ok()) << st.to_string();
+    }
+
+    VariantStorageParseWriteResult kv_result;
+    {
+        ScopedVariantStorageParseMode guard(2);
+        SegmentFooterPB footer;
+        std::string file_path;
+        auto st = write_storage_parsed_segment(jsons, "force_doc_value", 
&footer, &file_path, false,
+                                               &kv_result);
+        ASSERT_TRUE(st.ok()) << st.to_string();
+    }
+
+    EXPECT_EQ(parse_time_result.num_rows, kRows);
+    EXPECT_EQ(kv_result.num_rows, kRows);
+    EXPECT_EQ(parse_time_result.doc_value_entries, static_cast<size_t>(0));
+    EXPECT_EQ(kv_result.doc_value_entries, kRows * kPathsPerRow);
+    EXPECT_GE(parse_time_result.parsed_subcolumns, kSparseKeyPool);
+    EXPECT_LE(kv_result.parsed_subcolumns, static_cast<size_t>(1));
+    EXPECT_GT(parse_time_result.parsed_subcolumns, 
kv_result.parsed_subcolumns);
+    EXPECT_LT(kv_result.parsed_allocated_bytes, 
parse_time_result.parsed_allocated_bytes);
+
+    // KV staging is only a parse-time shape for plain non-doc VARIANT. The 
writer still emits the
+    // same top-N materialized subcolumns plus sparse fallback, with no 
persistent doc-value column.
+    EXPECT_EQ(parse_time_result.footer_columns, kv_result.footer_columns);
+    EXPECT_EQ(parse_time_result.materialized_columns, 
kv_result.materialized_columns);
+    EXPECT_EQ(parse_time_result.sparse_columns, kv_result.sparse_columns);
+    EXPECT_EQ(parse_time_result.doc_value_columns, 0);
+    EXPECT_EQ(kv_result.doc_value_columns, 0);
+    EXPECT_EQ(kv_result.materialized_columns, static_cast<int>(kDenseKeys));
+    EXPECT_EQ(kv_result.sparse_columns, 1);
+    EXPECT_GT(parse_time_result.segment_file_size, static_cast<uint64_t>(0));
+    EXPECT_GT(kv_result.segment_file_size, static_cast<uint64_t>(0));
+}
+
 TEST_F(VariantColumnWriterReaderTest, test_write_data_advanced) {
     // 1. create tablet_schema
     TabletSchemaPB schema_pb;
diff --git a/be/test/storage/segment/variant_util_test.cpp 
b/be/test/storage/segment/variant_util_test.cpp
index 86bcbd8a668..6e4ad0651e0 100644
--- a/be/test/storage/segment/variant_util_test.cpp
+++ b/be/test/storage/segment/variant_util_test.cpp
@@ -19,6 +19,7 @@
 
 #include <gen_cpp/olap_file.pb.h>
 
+#include <iostream>
 #include <string>
 #include <string_view>
 #include <vector>
@@ -44,6 +45,68 @@ static ColumnString::MutablePtr _make_json_column(const 
std::vector<std::string_
     return col;
 }
 
+static TabletSchema _make_variant_schema(bool enable_doc_mode = false,
+                                         bool enable_nested_group = false) {
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    auto* c = schema_pb.add_column();
+    c->set_unique_id(1);
+    c->set_name("v");
+    c->set_type("VARIANT");
+    c->set_is_key(false);
+    c->set_is_nullable(false);
+    c->set_variant_enable_doc_mode(enable_doc_mode);
+    c->set_variant_enable_nested_group(enable_nested_group);
+
+    TabletSchema tablet_schema;
+    tablet_schema.init_from_pb(schema_pb);
+    return tablet_schema;
+}
+
+static TabletColumn _make_subcolumn_template(
+        const std::string& path, PatternTypePB pattern_type = 
PatternTypePB::MATCH_NAME) {
+    ColumnPB column_pb;
+    column_pb.set_unique_id(-1);
+    column_pb.set_name(path);
+    column_pb.set_type("STRING");
+    column_pb.set_is_nullable(true);
+    column_pb.set_pattern_type(pattern_type);
+
+    TabletColumn column;
+    column.init_from_pb(column_pb);
+    return column;
+}
+
+static void _append_parent_inverted_index(TabletSchema* tablet_schema, int64_t 
index_id = 10001) {
+    TabletIndexPB index_pb;
+    index_pb.set_index_id(index_id);
+    index_pb.set_index_name("idx_v");
+    index_pb.set_index_type(IndexType::INVERTED);
+    index_pb.add_col_unique_id(1);
+
+    TabletIndex tablet_index;
+    tablet_index.init_from_pb(index_pb);
+    tablet_schema->append_index(std::move(tablet_index));
+}
+
+static Block _make_scalar_variant_block(const std::vector<std::string>& jsons,
+                                        bool enable_doc_mode = false) {
+    auto variant = ColumnVariant::create(0, enable_doc_mode);
+    for (const auto& json : jsons) {
+        doris::VariantUtil::insert_root_scalar_field(
+                *variant, Field::create_field<TYPE_STRING>(String(json)));
+    }
+
+    Block block;
+    block.insert({variant->get_ptr(), std::make_shared<DataTypeVariant>(0, 
enable_doc_mode), "v"});
+    return block;
+}
+
+static size_t _doc_value_entry_count(const ColumnVariant& variant) {
+    const auto& offsets = variant.serialized_doc_value_column_offsets();
+    return offsets.empty() ? 0 : offsets.back();
+}
+
 class ScopedDuplicateJsonPathCheck {
 public:
     explicit ScopedDuplicateJsonPathCheck(bool value)
@@ -58,6 +121,18 @@ private:
     bool _old_value;
 };
 
+class ScopedVariantStorageParseMode {
+public:
+    explicit ScopedVariantStorageParseMode(int32_t value)
+            : _old_value(config::variant_storage_parse_mode) {
+        config::variant_storage_parse_mode = value;
+    }
+    ~ScopedVariantStorageParseMode() { config::variant_storage_parse_mode = 
_old_value; }
+
+private:
+    int32_t _old_value;
+};
+
 TEST(VariantUtilTest, ParseDocValueToSubcolumns_FillsDefaultsAndValues) {
     const std::vector<std::string_view> jsons = {
             R"({"a":1,"b":"x"})", //
@@ -431,29 +506,10 @@ TEST(VariantUtilTest, 
ParseDuplicateJsonPathsCheckDisabledByDefault) {
     EXPECT_THROW(parse_json_to_variant(*variant, *json_col, cfg), Exception);
 }
 
-TEST(VariantUtilTest, ParseVariantColumns_ScalarJsonStringToSubcolumns) {
-    TabletSchemaPB schema_pb;
-    schema_pb.set_keys_type(KeysType::DUP_KEYS);
-    auto* c = schema_pb.add_column();
-    c->set_unique_id(1);
-    c->set_name("v");
-    c->set_type("VARIANT");
-    c->set_is_key(false);
-    c->set_is_nullable(false);
-    // doc mode disabled
-    c->set_variant_enable_doc_mode(false);
-
-    TabletSchema tablet_schema;
-    tablet_schema.init_from_pb(schema_pb);
-
-    auto variant = ColumnVariant::create(0, false);
-    doris::VariantUtil::insert_root_scalar_field(
-            *variant, Field::create_field<TYPE_STRING>(String(R"({"a":1})")));
-    doris::VariantUtil::insert_root_scalar_field(
-            *variant, Field::create_field<TYPE_STRING>(String(R"({"a":2})")));
-
-    Block block;
-    block.insert({variant->get_ptr(), std::make_shared<DataTypeVariant>(0, 
false), "v"});
+TEST(VariantUtilTest, ParseVariantColumns_StorageNonDocScalarJsonToDocValueKv) 
{
+    TabletSchema tablet_schema = _make_variant_schema(false);
+    std::vector<std::string> jsons {R"({"a":1})", R"({"a":2})"};
+    Block block = _make_scalar_variant_block(jsons);
 
     const std::vector<uint32_t> column_pos {0};
     Status st = parse_and_materialize_variant_columns(block, tablet_schema, 
column_pos);
@@ -463,16 +519,215 @@ TEST(VariantUtilTest, 
ParseVariantColumns_ScalarJsonStringToSubcolumns) {
     const auto& out = assert_cast<const ColumnVariant&>(col0);
 
     const auto* sub_a = out.get_subcolumn(PathInData("a"));
-    ASSERT_TRUE(sub_a != nullptr);
+    ASSERT_TRUE(sub_a == nullptr);
+
+    const auto& doc_offsets = out.serialized_doc_value_column_offsets();
+    ASSERT_EQ(doc_offsets.size(), jsons.size());
+    EXPECT_EQ(doc_offsets.back(), jsons.size());
+
+    auto docs_subcolumns = materialize_docs_to_subcolumns_map(out);
+    ASSERT_TRUE(docs_subcolumns.contains("a"));
+    auto& materialized_a = docs_subcolumns.at("a");
+    materialized_a.finalize();
+
     FieldWithDataType f;
-    sub_a->get(0, f);
+    materialized_a.get(0, f);
     EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
     EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 1);
-    sub_a->get(1, f);
+    materialized_a.get(1, f);
     EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
     EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 2);
 }
 
+TEST(VariantUtilTest, ParseVariantColumns_StorageTypedPathUsesDocValueKv) {
+    TabletSchema tablet_schema = _make_variant_schema(false);
+    auto typed_path = _make_subcolumn_template("a");
+    tablet_schema.mutable_column_by_uid(1).add_sub_column(typed_path);
+
+    Block block = _make_scalar_variant_block({R"({"a":"x","b":"y"})"});
+    Status st = parse_and_materialize_variant_columns(block, tablet_schema, 
{0});
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto& out = assert_cast<const 
ColumnVariant&>(*block.get_by_position(0).column);
+    EXPECT_EQ(out.get_subcolumn(PathInData("a")), nullptr);
+    EXPECT_EQ(out.get_subcolumn(PathInData("b")), nullptr);
+    ASSERT_EQ(out.serialized_doc_value_column_offsets().size(), 1);
+    EXPECT_EQ(out.serialized_doc_value_column_offsets().back(), 2);
+
+    auto docs_subcolumns = materialize_docs_to_subcolumns_map(out);
+    EXPECT_TRUE(docs_subcolumns.contains("a"));
+    EXPECT_TRUE(docs_subcolumns.contains("b"));
+}
+
+TEST(VariantUtilTest, ParseVariantColumns_StorageParentIndexUsesDocValueKv) {
+    TabletSchema tablet_schema = _make_variant_schema(false);
+    _append_parent_inverted_index(&tablet_schema);
+
+    Block block = _make_scalar_variant_block({R"({"a":"x","b":"y"})"});
+    Status st = parse_and_materialize_variant_columns(block, tablet_schema, 
{0});
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto& out = assert_cast<const 
ColumnVariant&>(*block.get_by_position(0).column);
+    EXPECT_EQ(out.get_subcolumn(PathInData("a")), nullptr);
+    EXPECT_EQ(out.get_subcolumn(PathInData("b")), nullptr);
+    ASSERT_EQ(out.serialized_doc_value_column_offsets().size(), 1);
+    EXPECT_EQ(out.serialized_doc_value_column_offsets().back(), 2);
+
+    auto docs_subcolumns = materialize_docs_to_subcolumns_map(out);
+    EXPECT_TRUE(docs_subcolumns.contains("a"));
+    EXPECT_TRUE(docs_subcolumns.contains("b"));
+}
+
+TEST(VariantUtilTest, 
ParseVariantColumns_DocModeKeepsDocValueWithTypedPathAndParentIndex) {
+    TabletSchema tablet_schema = _make_variant_schema(true);
+    auto typed_path = _make_subcolumn_template("a");
+    tablet_schema.mutable_column_by_uid(1).add_sub_column(typed_path);
+    _append_parent_inverted_index(&tablet_schema);
+
+    Block block = _make_scalar_variant_block({R"({"a":"x","b":"y"})"}, true);
+    Status st = parse_and_materialize_variant_columns(block, tablet_schema, 
{0});
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto& out = assert_cast<const 
ColumnVariant&>(*block.get_by_position(0).column);
+    EXPECT_EQ(out.get_subcolumn(PathInData("a")), nullptr);
+    EXPECT_EQ(out.get_subcolumn(PathInData("b")), nullptr);
+    ASSERT_EQ(out.serialized_doc_value_column_offsets().size(), 1);
+    EXPECT_EQ(out.serialized_doc_value_column_offsets().back(), 2);
+
+    auto docs_subcolumns = materialize_docs_to_subcolumns_map(out);
+    EXPECT_TRUE(docs_subcolumns.contains("a"));
+    EXPECT_TRUE(docs_subcolumns.contains("b"));
+}
+
+TEST(VariantUtilTest, 
ParseVariantColumns_StorageParseModeConfigControlsPlainNonDocPath) {
+    TabletSchema tablet_schema = _make_variant_schema(false);
+
+    {
+        ScopedVariantStorageParseMode parse_time_guard(1);
+        Block block = _make_scalar_variant_block({R"({"a":"x","b":"y"})"});
+        Status st = parse_and_materialize_variant_columns(block, 
tablet_schema, {0});
+        ASSERT_TRUE(st.ok()) << st.to_string();
+
+        const auto& out = assert_cast<const 
ColumnVariant&>(*block.get_by_position(0).column);
+        EXPECT_EQ(_doc_value_entry_count(out), static_cast<size_t>(0));
+        EXPECT_NE(out.get_subcolumn(PathInData("a")), nullptr);
+        EXPECT_NE(out.get_subcolumn(PathInData("b")), nullptr);
+    }
+
+    {
+        ScopedVariantStorageParseMode doc_value_guard(2);
+        Block block = _make_scalar_variant_block({R"({"a":"x","b":"y"})"});
+        Status st = parse_and_materialize_variant_columns(block, 
tablet_schema, {0});
+        ASSERT_TRUE(st.ok()) << st.to_string();
+
+        const auto& out = assert_cast<const 
ColumnVariant&>(*block.get_by_position(0).column);
+        EXPECT_EQ(out.get_subcolumn(PathInData("a")), nullptr);
+        EXPECT_EQ(out.get_subcolumn(PathInData("b")), nullptr);
+        EXPECT_EQ(_doc_value_entry_count(out), static_cast<size_t>(2));
+    }
+}
+
+TEST(VariantUtilTest, SparseStorageParseUsesDocValueKvInsteadOfManySubcolumns) 
{
+    constexpr int kRows = 1000;
+    std::vector<std::string> jsons;
+    jsons.reserve(kRows);
+    for (int i = 0; i < kRows; ++i) {
+        jsons.push_back("{\"k" + std::to_string(i) + "\":\"" + 
std::to_string(i) + "\"}");
+    }
+
+    ParseConfig old_parse_cfg;
+    old_parse_cfg.deprecated_enable_flatten_nested = false;
+    old_parse_cfg.parse_to = ParseConfig::ParseTo::OnlySubcolumns;
+
+    Block old_block = _make_scalar_variant_block(jsons);
+    Status old_st = parse_and_materialize_variant_columns(old_block, 
std::vector<uint32_t> {0},
+                                                          {old_parse_cfg});
+    ASSERT_TRUE(old_st.ok()) << old_st.to_string();
+    const auto& old_variant =
+            assert_cast<const 
ColumnVariant&>(*old_block.get_by_position(0).column);
+
+    TabletSchema tablet_schema = _make_variant_schema(false);
+    Block new_block = _make_scalar_variant_block(jsons);
+    Status new_st = parse_and_materialize_variant_columns(new_block, 
tablet_schema, {0});
+    ASSERT_TRUE(new_st.ok()) << new_st.to_string();
+    const auto& new_variant =
+            assert_cast<const 
ColumnVariant&>(*new_block.get_by_position(0).column);
+
+    const size_t old_subcolumns = old_variant.get_subcolumns().size();
+    const size_t new_subcolumns = new_variant.get_subcolumns().size();
+    const size_t old_bytes = old_variant.allocated_bytes();
+    const size_t new_bytes = new_variant.allocated_bytes();
+
+    std::cout << "sparse variant parse memory old_subcolumns=" << 
old_subcolumns
+              << " new_subcolumns=" << new_subcolumns << " old_bytes=" << 
old_bytes
+              << " new_bytes=" << new_bytes << std::endl;
+
+    EXPECT_GE(old_subcolumns, static_cast<size_t>(kRows));
+    EXPECT_LE(new_subcolumns, static_cast<size_t>(1));
+    EXPECT_LT(new_bytes, old_bytes);
+
+    const auto& doc_offsets = 
new_variant.serialized_doc_value_column_offsets();
+    ASSERT_EQ(doc_offsets.size(), kRows);
+    EXPECT_EQ(doc_offsets.back(), kRows);
+}
+
+TEST(VariantUtilTest, 
ParseVariantColumns_StorageNonDocDocValueKvSkipsInvalidRoot) {
+    TabletSchema tablet_schema = _make_variant_schema(false);
+    Block invalid_root_block = _make_scalar_variant_block({R"([])"});
+
+    Status st = parse_and_materialize_variant_columns(invalid_root_block, 
tablet_schema, {0});
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto& invalid_root_variant =
+            assert_cast<const 
ColumnVariant&>(*invalid_root_block.get_by_position(0).column);
+    EXPECT_TRUE(invalid_root_variant.is_null_root());
+    
ASSERT_EQ(invalid_root_variant.serialized_doc_value_column_offsets().size(), 1);
+    
EXPECT_EQ(invalid_root_variant.serialized_doc_value_column_offsets().back(), 0);
+
+    Block scalar_root_block = _make_scalar_variant_block({R"(100)"});
+    st = parse_and_materialize_variant_columns(scalar_root_block, 
tablet_schema, {0});
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    const auto& scalar_root_variant =
+            assert_cast<const 
ColumnVariant&>(*scalar_root_block.get_by_position(0).column);
+    ASSERT_TRUE(scalar_root_variant.is_scalar_variant());
+    DataTypeSerDe::FormatOptions options;
+    std::string value;
+    scalar_root_variant.serialize_one_row_to_string(0, &value, options);
+    EXPECT_EQ(value, "100");
+}
+
+TEST(VariantUtilTest, ParseNullableScalarVariantDetachesNestedAlias) {
+    auto variant = ColumnVariant::create(0, false);
+    doris::VariantUtil::insert_root_scalar_field(*variant, 
Field::create_field<TYPE_INT>(123));
+    ColumnPtr variant_ptr = std::move(variant);
+
+    auto null_map = ColumnUInt8::create();
+    null_map->insert_value(0);
+    ColumnPtr nullable_variant = ColumnNullable::create(variant_ptr, 
null_map->get_ptr());
+    variant_ptr.reset();
+    ColumnPtr nullable_alias = nullable_variant;
+
+    Block block;
+    block.insert(
+            {nullable_variant, 
make_nullable(std::make_shared<DataTypeVariant>(0, false)), "v"});
+
+    ParseConfig parse_cfg;
+    parse_cfg.deprecated_enable_flatten_nested = false;
+    parse_cfg.parse_to = ParseConfig::ParseTo::OnlySubcolumns;
+    Status st =
+            parse_and_materialize_variant_columns(block, std::vector<uint32_t> 
{0}, {parse_cfg});
+    EXPECT_TRUE(st.ok()) << st.to_string();
+
+    const auto& alias_nullable = assert_cast<const 
ColumnNullable&>(*nullable_alias);
+    const auto& alias_variant =
+            assert_cast<const 
ColumnVariant&>(alias_nullable.get_nested_column());
+    EXPECT_TRUE(alias_variant.is_scalar_variant());
+    EXPECT_EQ(alias_variant.get_root_type()->get_primitive_type(), 
PrimitiveType::TYPE_INT);
+
+    EXPECT_TRUE(block.get_by_position(0).column->is_nullable());
+}
+
 TEST(VariantUtilTest, ParseVariantColumns_DocModeBinaryToSubcolumns) {
     const std::vector<std::string_view> jsons = {
             R"({"a":1,"b":"x"})", //


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to