This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d57851c8743 [fix](variant) reduce doc compaction writer peak memory 
(#61955)
d57851c8743 is described below

commit d57851c8743e3d37e95a2159ec066cf240ce19bc
Author: lihangyu <[email protected]>
AuthorDate: Wed Apr 1 11:00:52 2026 +0800

    [fix](variant) reduce doc compaction writer peak memory (#61955)
    
    - cap doc-value path reserve and reuse exact unique-path counts when
    available
    - release doc write plans before doc-value writing to avoid overlapping
    peaks
    - eagerly finish and flush compact-writer subcolumns, and make
    finish/write_data idempotent
    - avoid merge/sort when bucket = 1
    - add unit coverage for expected_unique_paths and compact-writer
    roundtrip/idempotence
    
    The blue line is the improved version
    
    <img width="1578" height="570" alt="image"
    
src="https://github.com/user-attachments/assets/eb501829-627c-45e8-85a1-93c63d7148de";
    />
---
 be/src/exec/common/variant_util.cpp                |  10 +-
 be/src/exec/common/variant_util.h                  |   2 +-
 .../segment/variant/binary_column_reader.cpp       |  12 +-
 .../segment/variant/variant_column_writer_impl.cpp |  96 +++--
 .../segment/variant/variant_column_writer_impl.h   |   1 +
 .../segment/variant_column_writer_reader_test.cpp  | 386 +++++++++++----------
 be/test/storage/segment/variant_util_test.cpp      |  61 ++++
 7 files changed, 343 insertions(+), 225 deletions(-)

diff --git a/be/src/exec/common/variant_util.cpp 
b/be/src/exec/common/variant_util.cpp
index 73a31b95c7b..e2bede96fbf 100644
--- a/be/src/exec/common/variant_util.cpp
+++ b/be/src/exec/common/variant_util.cpp
@@ -2052,7 +2052,8 @@ void materialize_docs_to_subcolumns(ColumnVariant& 
column_variant) {
 // ============ Implementation from variant_util.cpp ============
 
 phmap::flat_hash_map<std::string_view, ColumnVariant::Subcolumn> 
materialize_docs_to_subcolumns_map(
-        const ColumnVariant& variant) {
+        const ColumnVariant& variant, size_t expected_unique_paths) {
+    constexpr size_t kInitialPathReserve = 8192;
     phmap::flat_hash_map<std::string_view, ColumnVariant::Subcolumn> 
subcolumns;
 
     const auto [column_key, column_value] = 
variant.get_doc_value_data_paths_and_values();
@@ -2061,11 +2062,12 @@ phmap::flat_hash_map<std::string_view, 
ColumnVariant::Subcolumn> materialize_doc
 
     DCHECK_EQ(num_rows, variant.size()) << "doc snapshot offsets size mismatch 
with variant rows";
 
-    // Best-effort reserve: at most number of kv pairs.
-    subcolumns.reserve(column_key->size());
+    subcolumns.reserve(expected_unique_paths != 0
+                               ? expected_unique_paths
+                               : std::min<size_t>(column_key->size(), 
kInitialPathReserve));
 
     for (size_t row = 0; row < num_rows; ++row) {
-        const size_t start = (row == 0) ? 0 : column_offsets[row - 1];
+        const size_t start = column_offsets[row - 1];
         const size_t end = column_offsets[row];
         for (size_t i = start; i < end; ++i) {
             const auto& key = column_key->get_data_at(i);
diff --git a/be/src/exec/common/variant_util.h 
b/be/src/exec/common/variant_util.h
index dda580154b6..f4302146972 100644
--- a/be/src/exec/common/variant_util.h
+++ b/be/src/exec/common/variant_util.h
@@ -265,6 +265,6 @@ Status parse_and_materialize_variant_columns(Block& block, 
const TabletSchema& t
 // NOTE: Returned map keys are `std::string_view` pointing into the underlying 
doc snapshot paths
 // column, so the input `variant` must outlive the returned map.
 phmap::flat_hash_map<std::string_view, ColumnVariant::Subcolumn> 
materialize_docs_to_subcolumns_map(
-        const ColumnVariant& variant);
+        const ColumnVariant& variant, size_t expected_unique_paths = 0);
 
 } // namespace  doris::variant_util
diff --git a/be/src/storage/segment/variant/binary_column_reader.cpp 
b/be/src/storage/segment/variant/binary_column_reader.cpp
index fb9335b73c6..96de6c67507 100644
--- a/be/src/storage/segment/variant/binary_column_reader.cpp
+++ b/be/src/storage/segment/variant/binary_column_reader.cpp
@@ -107,6 +107,16 @@ BinaryColumnType SingleSparseColumnReader::get_type() 
const {
 }
 
 Status 
MultipleBinaryColumnReader::new_binary_column_iterator(ColumnIteratorUPtr* 
iter) const {
+    // Single bucket can be read directly without cross-bucket merge/sort.
+    if (_multiple_column_readers.size() == 1) {
+        DCHECK(!_multiple_column_readers.empty());
+        auto it = _multiple_column_readers.begin();
+        ColumnIteratorUPtr single_iter;
+        RETURN_IF_ERROR(it->second->new_iterator(&single_iter, nullptr));
+        *iter = std::move(single_iter);
+        return Status::OK();
+    }
+
     std::vector<std::unique_ptr<ColumnIterator>> iters;
     iters.reserve(_multiple_column_readers.size());
     for (const auto& [index, reader] : _multiple_column_readers) {
@@ -268,4 +278,4 @@ void 
CombineMultipleBinaryColumnIterator::_collect_sparse_data_from_buckets(
 }
 
 #include "common/compile_check_end.h"
-} // namespace doris::segment_v2
\ No newline at end of file
+} // namespace doris::segment_v2
diff --git a/be/src/storage/segment/variant/variant_column_writer_impl.cpp 
b/be/src/storage/segment/variant/variant_column_writer_impl.cpp
index 4f87a1640b3..f70d1565788 100644
--- a/be/src/storage/segment/variant/variant_column_writer_impl.cpp
+++ b/be/src/storage/segment/variant/variant_column_writer_impl.cpp
@@ -190,7 +190,7 @@ struct SubcolumnWriteEntry {
     std::string_view path;
     ColumnVariant::Subcolumn* subcolumn = nullptr;
     // nullptr means dense materialization; otherwise sparse row ids for this 
path.
-    const std::vector<uint32_t>* rowids = nullptr;
+    std::vector<uint32_t>* rowids = nullptr;
 };
 
 struct SubcolumnWritePlan {
@@ -203,6 +203,8 @@ struct SubcolumnWritePlan {
     DocValuePathStats stats;
 };
 
+constexpr size_t kInitialDocPathReserve = 8192;
+
 // Build per-path non-null counts from the serialized doc-value representation.
 void build_doc_value_stats(const ColumnVariant& variant, DocValuePathStats* 
stats) {
     auto [column_key, column_value] = 
variant.get_doc_value_data_paths_and_values();
@@ -211,7 +213,7 @@ void build_doc_value_stats(const ColumnVariant& variant, 
DocValuePathStats* stat
     const size_t num_rows = column_offsets.size();
 
     stats->clear();
-    stats->reserve(column_key->size());
+    stats->reserve(std::min<size_t>(column_key->size(), 
kInitialDocPathReserve));
     for (size_t row = 0; row < num_rows; ++row) {
         const size_t start = column_offsets[row - 1];
         const size_t end = column_offsets[row];
@@ -223,35 +225,35 @@ void build_doc_value_stats(const ColumnVariant& variant, 
DocValuePathStats* stat
     }
 }
 
-// Materialize sparse subcolumns for each path and build per-path non-null 
counts.
+// Materialize sparse subcolumns for each path using precomputed per-path 
non-null counts.
 // For each row, we decode only present (path, value) pairs and append them to 
the
 // corresponding subcolumn, while recording the row id to allow gap filling 
later.
-void build_sparse_subcolumns_and_stats(const ColumnVariant& variant,
-                                       DocSparseSubcolumns* subcolumns, 
DocValuePathStats* stats) {
+void build_sparse_subcolumns(const ColumnVariant& variant, const 
DocValuePathStats& stats,
+                             DocSparseSubcolumns* subcolumns) {
     auto [column_key, column_value] = 
variant.get_doc_value_data_paths_and_values();
     const auto& column_offsets = variant.serialized_doc_value_column_offsets();
     const size_t num_rows = column_offsets.size();
 
     subcolumns->clear();
-    stats->clear();
-    subcolumns->reserve(column_key->size());
+    subcolumns->reserve(stats.size());
 
     for (size_t row = 0; row < num_rows; ++row) {
         const size_t start = column_offsets[row - 1];
         const size_t end = column_offsets[row];
         for (size_t i = start; i < end; ++i) {
             const StringRef path = column_key->get_data_at(i);
-            auto& data = subcolumns->try_emplace(path).first->second;
+            auto stat_it = stats.find(path);
+            DCHECK(stat_it != stats.end());
+            auto [data_it, inserted] = subcolumns->try_emplace(path);
+            auto& data = data_it->second;
+            if (inserted) {
+                data.rowids.reserve(stat_it->second);
+            }
             data.rowids.push_back(cast_set<uint32_t>(row));
             data.subcolumn.deserialize_from_binary_column(column_value, i);
             ++data.non_null_count;
         }
     }
-
-    stats->reserve(subcolumns->size());
-    for (const auto& [path, data] : *subcolumns) {
-        stats->try_emplace(path, data.non_null_count);
-    }
 }
 
 SubcolumnWritePlan build_subcolumn_write_plan(const ColumnVariant& variant, 
size_t num_rows,
@@ -263,7 +265,8 @@ SubcolumnWritePlan build_subcolumn_write_plan(const 
ColumnVariant& variant, size
     }
 
     if (config::enable_variant_doc_sparse_write_subcolumns) {
-        build_sparse_subcolumns_and_stats(variant, &plan.sparse_subcolumns, 
&plan.stats);
+        build_doc_value_stats(variant, &plan.stats);
+        build_sparse_subcolumns(variant, plan.stats, &plan.sparse_subcolumns);
         plan.entries.reserve(plan.sparse_subcolumns.size());
         for (auto& [path, sparse] : plan.sparse_subcolumns) {
             SubcolumnWriteEntry entry;
@@ -277,7 +280,8 @@ SubcolumnWritePlan build_subcolumn_write_plan(const 
ColumnVariant& variant, size
     }
 
     build_doc_value_stats(variant, &plan.stats);
-    plan.dense_subcolumns = 
variant_util::materialize_docs_to_subcolumns_map(variant);
+    plan.dense_subcolumns =
+            variant_util::materialize_docs_to_subcolumns_map(variant, 
plan.stats.size());
     plan.entries.reserve(plan.dense_subcolumns.size());
     for (auto& [path, subcolumn] : plan.dense_subcolumns) {
         SubcolumnWriteEntry entry;
@@ -295,20 +299,39 @@ Status execute_doc_write_pipeline(const ColumnVariant& 
variant, size_t num_rows,
                                   WriteMaterializedFn&& write_materialized_fn,
                                   WriteDocValueFn&& write_doc_value_fn,
                                   DocValuePathStats* out_column_stats) {
-    SubcolumnWritePlan plan =
-            build_subcolumn_write_plan(variant, num_rows, 
variant_doc_materialization_min_rows);
-    *out_column_stats = std::move(plan.stats);
-    if (out_column_stats->empty()) {
-        build_doc_value_stats(variant, out_column_stats);
-    }
+    {
+        SubcolumnWritePlan plan =
+                build_subcolumn_write_plan(variant, num_rows, 
variant_doc_materialization_min_rows);
+        *out_column_stats = std::move(plan.stats);
+        if (out_column_stats->empty()) {
+            build_doc_value_stats(variant, out_column_stats);
+        }
 
-    for (auto& entry : plan.entries) {
-        RETURN_IF_ERROR(write_materialized_fn(entry, column_id));
+        for (auto& entry : plan.entries) {
+            RETURN_IF_ERROR(write_materialized_fn(entry, column_id));
+        }
     }
     RETURN_IF_ERROR(write_doc_value_fn(column_id));
     return Status::OK();
 }
 
+Status finish_and_write_column_writer(ColumnWriter* writer) {
+    RETURN_IF_ERROR(writer->finish());
+    RETURN_IF_ERROR(writer->write_data());
+    return Status::OK();
+}
+
+void release_processed_subcolumn_write_entry(SubcolumnWriteEntry* entry) {
+    DCHECK(entry != nullptr);
+    DCHECK(entry->subcolumn != nullptr);
+    ColumnVariant::Subcolumn released_subcolumn;
+    std::swap(*entry->subcolumn, released_subcolumn);
+    if (entry->rowids != nullptr) {
+        std::vector<uint32_t> released_rowids;
+        released_rowids.swap(*entry->rowids);
+    }
+}
+
 bool is_invalid_materialized_subcolumn_type(const DataTypePtr& type) {
     return variant_util::get_base_type_of_array(type)->get_primitive_type() ==
            PrimitiveType::INVALID_TYPE;
@@ -1736,6 +1759,9 @@ Status VariantDocCompactWriter::finish() {
     if (!is_finalized()) {
         RETURN_IF_ERROR(finalize());
     }
+    if (_data_written) {
+        return Status::OK();
+    }
     for (auto& column_writer : _subcolumn_writers) {
         RETURN_IF_ERROR(column_writer->finish());
     }
@@ -1746,10 +1772,14 @@ Status VariantDocCompactWriter::write_data() {
     if (!is_finalized()) {
         RETURN_IF_ERROR(finalize());
     }
+    if (_data_written) {
+        return Status::OK();
+    }
     for (auto& column_writer : _subcolumn_writers) {
         RETURN_IF_ERROR(column_writer->write_data());
     }
     RETURN_IF_ERROR(_doc_value_column_writer->write_data());
+    _data_written = true;
     return Status::OK();
 }
 Status VariantDocCompactWriter::write_ordinal_index() {
@@ -1854,13 +1884,21 @@ Status VariantDocCompactWriter::finalize() {
             *variant_column, num_rows, variant_doc_materialization_min_rows, 
column_id,
             [this, &parent_column, num_rows, &converter](SubcolumnWriteEntry& 
entry,
                                                          int& 
materialized_column_id) {
-                return _write_materialized_subcolumn(parent_column, 
entry.path, *entry.subcolumn,
-                                                     num_rows, converter.get(),
-                                                     materialized_column_id, 
entry.rowids);
+                const size_t prev_writer_count = _subcolumn_writers.size();
+                RETURN_IF_ERROR(_write_materialized_subcolumn(
+                        parent_column, entry.path, *entry.subcolumn, num_rows, 
converter.get(),
+                        materialized_column_id, entry.rowids));
+                DCHECK_EQ(_subcolumn_writers.size(), prev_writer_count + 1);
+                
RETURN_IF_ERROR(finish_and_write_column_writer(_subcolumn_writers.back().get()));
+                release_processed_subcolumn_write_entry(&entry);
+                return Status::OK();
             },
             [this, &parent_column, variant_column, &converter, num_rows](int 
doc_value_column_id) {
-                return _write_doc_value_column(parent_column, variant_column, 
converter.get(),
-                                               doc_value_column_id, num_rows);
+                RETURN_IF_ERROR(_write_doc_value_column(parent_column, 
variant_column,
+                                                        converter.get(), 
doc_value_column_id,
+                                                        num_rows));
+                
RETURN_IF_ERROR(finish_and_write_column_writer(_doc_value_column_writer.get()));
+                return Status::OK();
             },
             &column_stats));
 
@@ -1870,6 +1908,8 @@ Status VariantDocCompactWriter::finalize() {
     for (const auto& [k, cnt] : column_stats) {
         (*doc_value_column_non_null_size)[std::string(k)] = cnt;
     }
+    _column = ColumnVariant::create(0, false);
+    _data_written = true;
     _is_finalized = true;
     return Status::OK();
 }
diff --git a/be/src/storage/segment/variant/variant_column_writer_impl.h 
b/be/src/storage/segment/variant/variant_column_writer_impl.h
index 15892ab6331..b4100388995 100644
--- a/be/src/storage/segment/variant/variant_column_writer_impl.h
+++ b/be/src/storage/segment/variant/variant_column_writer_impl.h
@@ -284,6 +284,7 @@ private:
     const TabletColumn* _tablet_column = nullptr;
     ColumnWriterOptions _opts;
     bool _is_finalized = false;
+    bool _data_written = false;
     std::unique_ptr<ColumnWriter> _doc_value_column_writer;
     std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers;
     std::vector<TabletIndexes> _subcolumns_indexes;
diff --git a/be/test/storage/segment/variant_column_writer_reader_test.cpp 
b/be/test/storage/segment/variant_column_writer_reader_test.cpp
index e309aa6f2b1..a91eb450357 100644
--- a/be/test/storage/segment/variant_column_writer_reader_test.cpp
+++ b/be/test/storage/segment/variant_column_writer_reader_test.cpp
@@ -145,6 +145,9 @@ static Status create_variant_root_reader(const 
SegmentFooterPB& footer,
     return Status::OK();
 }
 
+static std::string expected_doc_bucket_json_from_full(const std::string& 
full_json, int bucket_num,
+                                                      int bucket_index);
+
 class VariantColumnWriterReaderTest : public testing::Test {
 public:
     void SetUp() override {
@@ -334,6 +337,193 @@ protected:
         return Status::OK();
     }
 
+    void validate_doc_compact_writer_roundtrip(bool repeat_finish_write_calls) 
{
+        constexpr int kRows = 200;
+        constexpr int kDocBuckets = 4;
+        constexpr int kBucket = 0;
+
+        TabletSchemaPB schema_pb;
+        schema_pb.set_keys_type(KeysType::DUP_KEYS);
+        construct_column(schema_pb.add_column(), 1, "VARIANT", "V1", 3, false, 
false,
+                         /*variant_sparse_hash_shard_count=*/0,
+                         /*variant_enable_doc_mode=*/true,
+                         /*variant_doc_materialization_min_rows=*/0,
+                         /*variant_doc_hash_shard_count=*/kDocBuckets);
+        _tablet_schema = std::make_shared<TabletSchema>();
+        _tablet_schema->init_from_pb(schema_pb);
+
+        TabletColumn parent_column = _tablet_schema->column(0);
+        TabletColumn extracted_doc_bucket =
+                variant_util::create_doc_value_column(parent_column, kBucket);
+        extracted_doc_bucket.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
+        extracted_doc_bucket.set_is_nullable(false);
+        _tablet_schema->append_column(extracted_doc_bucket);
+
+        TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
+        _tablet_schema->set_external_segment_meta_used_default(false);
+        tablet_meta->_tablet_id = 33000;
+        _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, 
_data_dir.get());
+        EXPECT_TRUE(_tablet->init().ok());
+        
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+        
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
+
+        io::FileWriterPtr file_writer;
+        auto file_path = local_segment_path(_tablet->tablet_path(), "0", 0);
+        auto st = io::global_local_filesystem()->create_file(file_path, 
&file_writer);
+        EXPECT_TRUE(st.ok()) << st.msg();
+
+        SegmentFooterPB footer;
+
+        RowsetWriterContext rowset_ctx;
+        rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
+        rowset_ctx.tablet_schema = _tablet_schema;
+
+        ColumnWriterOptions root_opts;
+        root_opts.meta = footer.add_columns();
+        root_opts.compression_type = CompressionTypePB::LZ4;
+        root_opts.file_writer = file_writer.get();
+        root_opts.footer = &footer;
+        root_opts.rowset_ctx = &rowset_ctx;
+        _init_column_meta(root_opts.meta, 0, parent_column, 
CompressionTypePB::LZ4);
+
+        std::unique_ptr<ColumnWriter> root_writer;
+        EXPECT_TRUE(ColumnWriter::create(root_opts, &parent_column, 
file_writer.get(), &root_writer)
+                            .ok());
+        EXPECT_TRUE(root_writer->init().ok());
+
+        TabletColumn extracted_doc_bucket_col = _tablet_schema->column(1);
+        ColumnWriterOptions doc_compact_opts = root_opts;
+        doc_compact_opts.meta = footer.add_columns();
+        _init_column_meta(doc_compact_opts.meta, 0, extracted_doc_bucket_col,
+                          CompressionTypePB::LZ4);
+        std::unique_ptr<ColumnWriter> doc_compact_writer;
+        EXPECT_TRUE(ColumnWriter::create(doc_compact_opts, 
&extracted_doc_bucket_col,
+                                         file_writer.get(), 
&doc_compact_writer)
+                            .ok());
+        EXPECT_TRUE(doc_compact_writer->init().ok());
+
+        std::unordered_map<int, std::string> inserted_full_json;
+        auto type_string = std::make_shared<DataTypeString>();
+        auto full_json_column = type_string->create_column();
+        auto* full_strings = 
assert_cast<ColumnString*>(full_json_column.get());
+        VariantUtil::fill_string_column_with_test_data(full_strings, kRows, 
&inserted_full_json);
+
+        std::unordered_map<int, std::string> expected_bucket_json;
+        auto bucket_json_column = type_string->create_column();
+        auto* bucket_strings = 
assert_cast<ColumnString*>(bucket_json_column.get());
+        for (int i = 0; i < kRows; ++i) {
+            const std::string& full = inserted_full_json[i];
+            std::string bucket_json =
+                    expected_doc_bucket_json_from_full(full, kDocBuckets, 
kBucket);
+            expected_bucket_json.emplace(i, bucket_json);
+            bucket_strings->insert_data(bucket_json.data(), 
bucket_json.size());
+        }
+
+        ParseConfig config;
+        config.deprecated_enable_flatten_nested = false;
+        config.parse_to = ParseConfig::ParseTo::OnlyDocValueColumn;
+
+        MutableColumnPtr root_variant =
+                
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), true);
+        variant_util::parse_json_to_variant(*root_variant, *full_strings, 
config);
+
+        MutableColumnPtr bucket_variant =
+                
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), true);
+        variant_util::parse_json_to_variant(*bucket_variant, *bucket_strings, 
config);
+
+        auto root_data = std::make_unique<VariantColumnData>();
+        root_data->column_data = root_variant.get();
+        root_data->row_pos = 0;
+        const auto* root_ptr = reinterpret_cast<const 
uint8_t*>(root_data.get());
+        EXPECT_TRUE(root_writer->append_data(&root_ptr, kRows).ok());
+
+        auto bucket_data = std::make_unique<VariantColumnData>();
+        bucket_data->column_data = bucket_variant.get();
+        bucket_data->row_pos = 0;
+        const auto* bucket_ptr = reinterpret_cast<const 
uint8_t*>(bucket_data.get());
+        EXPECT_TRUE(doc_compact_writer->append_data(&bucket_ptr, kRows).ok());
+
+        EXPECT_TRUE(root_writer->finish().ok());
+        EXPECT_TRUE(doc_compact_writer->finish().ok());
+        if (repeat_finish_write_calls) {
+            EXPECT_TRUE(doc_compact_writer->finish().ok());
+        }
+        EXPECT_TRUE(root_writer->write_data().ok());
+        EXPECT_TRUE(doc_compact_writer->write_data().ok());
+        if (repeat_finish_write_calls) {
+            EXPECT_TRUE(doc_compact_writer->write_data().ok());
+            EXPECT_TRUE(doc_compact_writer->finish().ok());
+        }
+        EXPECT_TRUE(root_writer->write_ordinal_index().ok());
+        EXPECT_TRUE(doc_compact_writer->write_ordinal_index().ok());
+        EXPECT_TRUE(file_writer->close().ok());
+        footer.set_num_rows(kRows);
+
+        io::FileReaderSPtr file_reader;
+        st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        std::shared_ptr<ColumnReader> column_reader;
+        st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+        EXPECT_TRUE(variant_column_reader != nullptr);
+
+        bool checked_one_key = false;
+        for (int j = 0; j < 10; ++j) {
+            const std::string key = "key" + std::to_string(j);
+            StringRef ref {key.data(), key.size()};
+            if (variant_util::variant_binary_shard_of(ref, kDocBuckets) ==
+                static_cast<uint32_t>(kBucket)) {
+                
EXPECT_TRUE(variant_column_reader->get_subcolumn_meta_by_path(PathInData(key)) 
!=
+                            nullptr);
+                checked_one_key = true;
+                break;
+            }
+        }
+        EXPECT_TRUE(checked_one_key);
+
+        MockColumnReaderCache column_reader_cache(footer, file_reader, 
_tablet_schema);
+        StorageReadOptions storage_read_opts;
+        storage_read_opts.io_ctx.reader_type = 
ReaderType::READER_BASE_COMPACTION;
+        storage_read_opts.tablet_schema = _tablet_schema;
+        OlapReaderStatistics stats;
+        storage_read_opts.stats = &stats;
+
+        TabletColumn doc_bucket_map = 
variant_util::create_doc_value_column(parent_column, kBucket);
+        ColumnIteratorUPtr it;
+        st = variant_column_reader->new_iterator(&it, &doc_bucket_map, 
&storage_read_opts,
+                                                 &column_reader_cache);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        
EXPECT_TRUE(dynamic_cast<segment_v2::VariantDocValueCompactIterator*>(it.get()) 
!= nullptr);
+
+        ColumnIteratorOptions column_iter_opts;
+        column_iter_opts.stats = &stats;
+        column_iter_opts.file_reader = file_reader.get();
+        st = it->init(column_iter_opts);
+        EXPECT_TRUE(st.ok()) << st.msg();
+
+        DataTypeSerDe::FormatOptions options;
+        auto tz = cctz::utc_time_zone();
+        options.timezone = &tz;
+
+        MutableColumnPtr dst =
+                
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), false);
+        size_t nrows = kRows;
+        st = it->seek_to_ordinal(0);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        st = it->next_batch(&nrows, dst);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        EXPECT_EQ(nrows, kRows);
+
+        for (int i = 0; i < kRows; ++i) {
+            std::string value;
+            
assert_cast<ColumnVariant*>(dst.get())->serialize_one_row_to_string(i, &value, 
options);
+            EXPECT_EQ(value, expected_bucket_json[i]);
+        }
+
+        
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+    }
+
     TabletSchemaSPtr _tablet_schema = nullptr;
     StorageEngine* _engine_ref = nullptr;
     std::unique_ptr<DataDir> _data_dir = nullptr;
@@ -1435,198 +1625,12 @@ TEST_F(VariantColumnWriterReaderTest, 
test_read_doc_compact_from_doc_value_bucke
 }
 
 TEST_F(VariantColumnWriterReaderTest, 
test_write_doc_compact_writer_and_read_doc_compact) {
-    constexpr int kRows = 200;
-    constexpr int kDocBuckets = 4;
-    constexpr int kBucket = 0;
-
-    // 1. create tablet_schema: root variant is in doc mode; plus one 
extracted doc bucket column
-    TabletSchemaPB schema_pb;
-    schema_pb.set_keys_type(KeysType::DUP_KEYS);
-    construct_column(schema_pb.add_column(), 1, "VARIANT", "V1", 3, false, 
false,
-                     /*variant_sparse_hash_shard_count=*/0,
-                     /*variant_enable_doc_mode=*/true,
-                     /*variant_doc_materialization_min_rows=*/0,
-                     /*variant_doc_hash_shard_count=*/kDocBuckets);
-    _tablet_schema = std::make_shared<TabletSchema>();
-    _tablet_schema->init_from_pb(schema_pb);
-
-    TabletColumn parent_column = _tablet_schema->column(0);
-    TabletColumn extracted_doc_bucket =
-            variant_util::create_doc_value_column(parent_column, kBucket);
-    // This matches VariantCompactionUtil::get_extended_compaction_schema 
behavior:
-    // extracted doc bucket columns are represented as VARIANT to trigger 
VariantDocCompactWriter.
-    extracted_doc_bucket.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
-    extracted_doc_bucket.set_is_nullable(false);
-    _tablet_schema->append_column(extracted_doc_bucket);
-
-    // 2. create tablet
-    TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
-    _tablet_schema->set_external_segment_meta_used_default(false);
-    tablet_meta->_tablet_id = 33000;
-    _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, 
_data_dir.get());
-    EXPECT_TRUE(_tablet->init().ok());
-    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
-    
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
-
-    // 3. create file_writer
-    io::FileWriterPtr file_writer;
-    auto file_path = local_segment_path(_tablet->tablet_path(), "0", 0);
-    auto st = io::global_local_filesystem()->create_file(file_path, 
&file_writer);
-    EXPECT_TRUE(st.ok()) << st.msg();
-
-    // 4. create column writers: root VariantColumnWriter + extracted 
VariantDocCompactWriter
-    SegmentFooterPB footer;
-
-    RowsetWriterContext rowset_ctx;
-    rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
-    rowset_ctx.tablet_schema = _tablet_schema;
-
-    ColumnWriterOptions root_opts;
-    root_opts.meta = footer.add_columns();
-    root_opts.compression_type = CompressionTypePB::LZ4;
-    root_opts.file_writer = file_writer.get();
-    root_opts.footer = &footer;
-    root_opts.rowset_ctx = &rowset_ctx;
-    _init_column_meta(root_opts.meta, 0, parent_column, 
CompressionTypePB::LZ4);
-
-    std::unique_ptr<ColumnWriter> root_writer;
-    EXPECT_TRUE(
-            ColumnWriter::create(root_opts, &parent_column, file_writer.get(), 
&root_writer).ok());
-    EXPECT_TRUE(root_writer->init().ok());
-
-    TabletColumn extracted_doc_bucket_col = _tablet_schema->column(1);
-    ColumnWriterOptions doc_compact_opts = root_opts;
-    doc_compact_opts.meta = footer.add_columns();
-    _init_column_meta(doc_compact_opts.meta, 0, extracted_doc_bucket_col, 
CompressionTypePB::LZ4);
-    std::unique_ptr<ColumnWriter> doc_compact_writer;
-    EXPECT_TRUE(ColumnWriter::create(doc_compact_opts, 
&extracted_doc_bucket_col, file_writer.get(),
-                                     &doc_compact_writer)
-                        .ok());
-    EXPECT_TRUE(doc_compact_writer->init().ok());
-
-    // 5. build doc-value-only data:
-    // - root column uses the full JSON (doc values only is enough for this 
test)
-    // - extracted doc bucket column uses bucket-filtered JSON so that doc 
bucket data matches
-    //   the bucket index expected by VariantDocCompactWriter.
-    std::unordered_map<int, std::string> inserted_full_json;
-    auto type_string = std::make_shared<DataTypeString>();
-    auto full_json_column = type_string->create_column();
-    auto* full_strings = assert_cast<ColumnString*>(full_json_column.get());
-    VariantUtil::fill_string_column_with_test_data(full_strings, kRows, 
&inserted_full_json);
-
-    std::unordered_map<int, std::string> expected_bucket_json;
-    auto bucket_json_column = type_string->create_column();
-    auto* bucket_strings = 
assert_cast<ColumnString*>(bucket_json_column.get());
-    for (int i = 0; i < kRows; ++i) {
-        const std::string& full = inserted_full_json[i];
-        std::string bucket_json = expected_doc_bucket_json_from_full(full, 
kDocBuckets, kBucket);
-        expected_bucket_json.emplace(i, bucket_json);
-        bucket_strings->insert_data(bucket_json.data(), bucket_json.size());
-    }
-
-    ParseConfig config;
-    config.deprecated_enable_flatten_nested = false;
-    config.parse_to = ParseConfig::ParseTo::OnlyDocValueColumn;
-
-    MutableColumnPtr root_variant =
-            
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), true);
-    variant_util::parse_json_to_variant(*root_variant, *full_strings, config);
-
-    MutableColumnPtr bucket_variant =
-            
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), true);
-    variant_util::parse_json_to_variant(*bucket_variant, *bucket_strings, 
config);
-
-    // 6. append and write
-    {
-        auto root_data = std::make_unique<VariantColumnData>();
-        root_data->column_data = root_variant.get();
-        root_data->row_pos = 0;
-        const auto* data = reinterpret_cast<const uint8_t*>(root_data.get());
-        EXPECT_TRUE(root_writer->append_data(&data, kRows).ok());
-    }
-    {
-        auto bucket_data = std::make_unique<VariantColumnData>();
-        bucket_data->column_data = bucket_variant.get();
-        bucket_data->row_pos = 0;
-        const auto* data = reinterpret_cast<const uint8_t*>(bucket_data.get());
-        EXPECT_TRUE(doc_compact_writer->append_data(&data, kRows).ok());
-    }
-
-    EXPECT_TRUE(root_writer->finish().ok());
-    EXPECT_TRUE(doc_compact_writer->finish().ok());
-    EXPECT_TRUE(root_writer->write_data().ok());
-    EXPECT_TRUE(doc_compact_writer->write_data().ok());
-    EXPECT_TRUE(root_writer->write_ordinal_index().ok());
-    EXPECT_TRUE(doc_compact_writer->write_ordinal_index().ok());
-    EXPECT_TRUE(file_writer->close().ok());
-    footer.set_num_rows(kRows);
-
-    // 7. open reader and validate:
-    // - doc bucket can be read via DOC_COMPACT iterator in flat-leaf 
compaction mode
-    // - materialized leaf meta exists for at least one key in this bucket
-    io::FileReaderSPtr file_reader;
-    st = io::global_local_filesystem()->open_file(file_path, &file_reader);
-    EXPECT_TRUE(st.ok()) << st.msg();
-    std::shared_ptr<ColumnReader> column_reader;
-    st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
-    EXPECT_TRUE(st.ok()) << st.msg();
-    auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
-    EXPECT_TRUE(variant_column_reader != nullptr);
-
-    bool checked_one_key = false;
-    for (int j = 0; j < 10; ++j) {
-        const std::string key = "key" + std::to_string(j);
-        StringRef ref {key.data(), key.size()};
-        if (variant_util::variant_binary_shard_of(ref, kDocBuckets) ==
-            static_cast<uint32_t>(kBucket)) {
-            
EXPECT_TRUE(variant_column_reader->get_subcolumn_meta_by_path(PathInData(key)) 
!=
-                        nullptr);
-            checked_one_key = true;
-            break;
-        }
-    }
-    EXPECT_TRUE(checked_one_key);
-
-    MockColumnReaderCache column_reader_cache(footer, file_reader, 
_tablet_schema);
-    StorageReadOptions storage_read_opts;
-    storage_read_opts.io_ctx.reader_type = ReaderType::READER_BASE_COMPACTION;
-    storage_read_opts.tablet_schema = _tablet_schema;
-    OlapReaderStatistics stats;
-    storage_read_opts.stats = &stats;
-
-    TabletColumn doc_bucket_map = 
variant_util::create_doc_value_column(parent_column, kBucket);
-    ColumnIteratorUPtr it;
-    st = variant_column_reader->new_iterator(&it, &doc_bucket_map, 
&storage_read_opts,
-                                             &column_reader_cache);
-    EXPECT_TRUE(st.ok()) << st.msg();
-    
EXPECT_TRUE(dynamic_cast<segment_v2::VariantDocValueCompactIterator*>(it.get()) 
!= nullptr);
-
-    ColumnIteratorOptions column_iter_opts;
-    column_iter_opts.stats = &stats;
-    column_iter_opts.file_reader = file_reader.get();
-    st = it->init(column_iter_opts);
-    EXPECT_TRUE(st.ok()) << st.msg();
-
-    DataTypeSerDe::FormatOptions options;
-    auto tz = cctz::utc_time_zone();
-    options.timezone = &tz;
-
-    MutableColumnPtr dst =
-            
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), false);
-    size_t nrows = kRows;
-    st = it->seek_to_ordinal(0);
-    EXPECT_TRUE(st.ok()) << st.msg();
-    st = it->next_batch(&nrows, dst);
-    EXPECT_TRUE(st.ok()) << st.msg();
-    EXPECT_EQ(nrows, kRows);
-
-    for (int i = 0; i < kRows; ++i) {
-        std::string value;
-        assert_cast<ColumnVariant*>(dst.get())->serialize_one_row_to_string(i, 
&value, options);
-        EXPECT_EQ(value, expected_bucket_json[i]);
-    }
+    validate_doc_compact_writer_roundtrip(false);
+}
 
-    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+TEST_F(VariantColumnWriterReaderTest,
+       
test_write_doc_compact_writer_finish_write_data_idempotent_and_read_doc_compact)
 {
+    validate_doc_compact_writer_roundtrip(true);
 }
 
 TEST_F(VariantColumnWriterReaderTest, test_doc_compact_sparse_write_array_gap) 
{
diff --git a/be/test/storage/segment/variant_util_test.cpp 
b/be/test/storage/segment/variant_util_test.cpp
index 9fb72343eba..e9f126faca0 100644
--- a/be/test/storage/segment/variant_util_test.cpp
+++ b/be/test/storage/segment/variant_util_test.cpp
@@ -95,6 +95,67 @@ TEST(VariantUtilTest, 
ParseDocValueToSubcolumns_FillsDefaultsAndValues) {
     EXPECT_EQ(fb.field.get_type(), PrimitiveType::TYPE_NULL); // missing
 }
 
+TEST(VariantUtilTest, 
MaterializeDocsToSubcolumnsMap_ExpectedUniquePathsPreservesValues) {
+    const std::vector<std::string_view> jsons = {
+            R"({"a":1,"b":"x"})", //
+            R"({"b":"y","c":2})", //
+            R"({"a":3,"c":4})",   //
+    };
+
+    auto variant = ColumnVariant::create(0, true);
+    auto json_col = _make_json_column(jsons);
+
+    ParseConfig cfg;
+    cfg.deprecated_enable_flatten_nested = false;
+    cfg.parse_to = ParseConfig::ParseTo::OnlyDocValueColumn;
+    parse_json_to_variant(*variant, *json_col, cfg);
+
+    EXPECT_TRUE(variant->is_doc_mode());
+
+    auto default_subcolumns = materialize_docs_to_subcolumns_map(*variant);
+    auto subcolumns = materialize_docs_to_subcolumns_map(*variant, 3);
+    ASSERT_EQ(subcolumns.size(), default_subcolumns.size());
+    ASSERT_EQ(subcolumns.size(), 3);
+
+    auto& a = subcolumns.at("a");
+    auto& b = subcolumns.at("b");
+    auto& c = subcolumns.at("c");
+    a.finalize();
+    b.finalize();
+    c.finalize();
+    EXPECT_EQ(a.size(), jsons.size());
+    EXPECT_EQ(b.size(), jsons.size());
+    EXPECT_EQ(c.size(), jsons.size());
+
+    FieldWithDataType f;
+    a.get(0, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
+    EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 1);
+    a.get(1, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_NULL);
+    a.get(2, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
+    EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 3);
+
+    b.get(0, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_STRING);
+    EXPECT_EQ(f.field.get<TYPE_STRING>(), "x");
+    b.get(1, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_STRING);
+    EXPECT_EQ(f.field.get<TYPE_STRING>(), "y");
+    b.get(2, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_NULL);
+
+    c.get(0, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_NULL);
+    c.get(1, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
+    EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 2);
+    c.get(2, f);
+    EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
+    EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 4);
+}
+
 TEST(VariantUtilTest, ParseOnlyDocValueColumn_SerializesMixedTypes) {
     const std::vector<std::string_view> jsons = {
             
R"({"b":true,"d":1.5,"u":18446744073709551615,"arr":[1,2,3],"arr2":[[1],[2]],"s":"x"})",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to