eldenmoon commented on code in PR #59183:
URL: https://github.com/apache/doris/pull/59183#discussion_r2638841062


##########
be/src/olap/rowset/segment_v2/variant/variant_util.h:
##########
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "vec/json/json_parser.h"
+
+namespace doris {
+class TabletSchema;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::segment_v2::variant_util {
+
+// Parse variant columns by picking variant positions from `column_pos` and 
generating ParseConfig
+// based on tablet schema settings (flatten nested / doc snapshot mode).
+Status parse_variant_columns(vectorized::Block& block, const TabletSchema& 
tablet_schema,
+                             const std::vector<uint32_t>& column_pos);
+
+// Moved from `vec/common/schema_util.{h,cpp}`.

Review Comment:
   remove this comment



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java:
##########
@@ -51,13 +51,24 @@ public class VariantType extends PrimitiveType {
     private final List<VariantField> predefinedFields;
     private final int variantSparseHashShardCount;
 
-    // No predefined fields
+    private final boolean enableVariantDocSnapshotMode;
+    private final long variantDocSnapshotMinRows;
+    private final int variantDocSnapshotShardCount;
+
+    /**
+     * Creates a Variant type without predefined fields and only configures 
the max subcolumn limit.
+     *
+     * @param variantMaxSubcolumnsCount max number of subcolumns allowed (0 
means unlimited)
+     */
     public VariantType(int variantMaxSubcolumnsCount) {
         this.variantMaxSubcolumnsCount = variantMaxSubcolumnsCount;
         this.predefinedFields = Lists.newArrayList();
         this.enableTypedPathsToSparse = false;
         this.variantMaxSparseColumnStatisticsSize = 10000;
         this.variantSparseHashShardCount = 0;
+        this.enableVariantDocSnapshotMode = false;
+        this.variantDocSnapshotMinRows = 0L;
+        this.variantDocSnapshotShardCount = 128;

Review Comment:
   is 128 suitable?



##########
regression-test/suites/variant_p0/doc_snapshot/predefine/load.groovy:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file

Review Comment:
   add test case mix with the new encoding mode and legacy encoding mode



##########
regression-test/suites/variant_p0/compaction/test_compaction.groovy:
##########
@@ -57,6 +57,8 @@ suite("test_compaction_variant") {
             """
         }
 
+        sql """ set default_variant_enable_doc_snapshot_mode = false; """

Review Comment:
   why add this?



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java:
##########
@@ -265,6 +265,13 @@ public class PropertyAnalyzer {
     // number of buckets when using bucketized sparse serialization
     public static final String PROPERTIES_VARIANT_SPARSE_HASH_SHARD_COUNT = 
"variant_sparse_hash_shard_count";
 
+    public static final String PROPERTIES_VARIANT_ENABLE_DOC_SNAPSHOT_MODE = 
"variant_enable_doc_snapshot_mode";

Review Comment:
   the new added properties should be exclusive with 
variant_max_subcolumns_count



##########
be/src/olap/rowset/segment_v2/variant/variant_column_reader.h:
##########
@@ -386,6 +391,8 @@ class VariantColumnReader : public ColumnReader {
     uint64_t _num_rows {0};
     uint32_t _root_unique_id {0};
 
+    std::unordered_map<uint32_t, std::shared_ptr<ColumnReader>> 
_doc_snapshot_column_readers;

Review Comment:
   跟sparse能不能统一reader, sparse和doc本身底层存储是一样的而且互斥



##########
be/src/olap/rowset/segment_v2/variant/variant_util.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/variant/variant_util.h"
+
+#include <glog/logging.h>
+
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/tablet_schema.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_variant.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/schema_util.h"
+#include "vec/data_types/data_type_jsonb.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/json/parse2column.h"
+
+namespace doris::segment_v2::variant_util {
+
+namespace {
+
+Status _parse_variant_columns(vectorized::Block& block, const 
std::vector<uint32_t>& variant_pos,
+                              const std::vector<vectorized::ParseConfig>& 
configs) {
+    for (size_t i = 0; i < variant_pos.size(); ++i) {
+        auto column_ref = block.get_by_position(variant_pos[i]).column;
+        bool is_nullable = column_ref->is_nullable();
+        vectorized::MutableColumnPtr var_column = column_ref->assume_mutable();
+        if (is_nullable) {
+            const auto& nullable = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref);
+            var_column = nullable.get_nested_column_ptr()->assume_mutable();
+        }
+        auto& var = assert_cast<vectorized::ColumnVariant&>(*var_column);
+        var_column->finalize();
+
+        vectorized::MutableColumnPtr variant_column;
+        if (var.is_doc_snapshot_mode()) {
+            // doc snapshot mode, we need to parse the doc snapshot column
+            vectorized::parse_binary_to_variant(var);
+            continue;
+        }
+        if (!var.is_scalar_variant()) {
+            // already parsed
+            continue;
+        }
+
+        VLOG_DEBUG << "parse scalar variant column: " << 
var.get_root_type()->get_name();
+        vectorized::ColumnPtr scalar_root_column;
+        if (var.get_root_type()->get_primitive_type() == TYPE_JSONB) {
+            // TODO more efficient way to parse jsonb type, currently we just 
convert jsonb to
+            // json str and parse them into variant
+            RETURN_IF_ERROR(vectorized::schema_util::cast_column(
+                    {var.get_root(), var.get_root_type(), ""},
+                    var.get_root()->is_nullable()
+                            ? 
make_nullable(std::make_shared<vectorized::DataTypeString>())
+                            : std::make_shared<vectorized::DataTypeString>(),
+                    &scalar_root_column));
+            if (scalar_root_column->is_nullable()) {
+                scalar_root_column =
+                        assert_cast<const 
vectorized::ColumnNullable*>(scalar_root_column.get())
+                                ->get_nested_column_ptr();
+            }
+        } else {
+            const auto& root = *var.get_root();
+            scalar_root_column = root.is_nullable()
+                                         ? assert_cast<const 
vectorized::ColumnNullable&>(root)
+                                                   .get_nested_column_ptr()
+                                         : var.get_root();
+        }
+
+        if (scalar_root_column->is_column_string()) {
+            variant_column = vectorized::ColumnVariant::create(0);
+            vectorized::parse_json_to_variant(
+                    *variant_column.get(),
+                    assert_cast<const 
vectorized::ColumnString&>(*scalar_root_column), configs[i]);
+        } else {
+            // Root maybe other types rather than string like 
ColumnVariant(Int32).
+            // In this case, we should finlize the root and cast to JSON type
+            auto expected_root_type =
+                    
make_nullable(std::make_shared<vectorized::ColumnVariant::MostCommonType>());
+            var.ensure_root_node_type(expected_root_type);
+            variant_column = var.assume_mutable();
+        }
+
+        // Wrap variant with nullmap if it is nullable
+        vectorized::ColumnPtr result = variant_column->get_ptr();
+        if (is_nullable) {
+            const auto& null_map = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref)
+                                           .get_null_map_column_ptr();
+            result = vectorized::ColumnNullable::create(result, null_map);
+        }
+        block.get_by_position(variant_pos[i]).column = result;
+    }
+    return Status::OK();
+}
+
+} // namespace
+
+Status parse_variant_columns(vectorized::Block& block, const 
std::vector<uint32_t>& variant_pos,
+                             const std::vector<vectorized::ParseConfig>& 
configs) {
+    RETURN_IF_CATCH_EXCEPTION({ return _parse_variant_columns(block, 
variant_pos, configs); });
+}
+
+Status parse_variant_columns(vectorized::Block& block, const TabletSchema& 
tablet_schema,
+                             const std::vector<uint32_t>& column_pos) {
+    std::vector<uint32_t> variant_column_pos;
+    for (const auto& pos : column_pos) {
+        const auto& column = tablet_schema.column(pos);
+        if (column.is_variant_type()) {
+            variant_column_pos.push_back(pos);
+        }
+    }
+
+    if (variant_column_pos.empty()) {
+        return Status::OK();
+    }
+
+    std::vector<vectorized::ParseConfig> configs(variant_column_pos.size());
+    for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+        configs[i].enable_flatten_nested = 
tablet_schema.variant_flatten_nested();
+        const auto& column = tablet_schema.column(variant_column_pos[i]);
+        if (column.is_variant_type()) {
+            // enable doc snapshot mode
+            if (column.variant_enable_doc_snapshot_mode()) {
+                // if has schema template, no need to parse to doc snapshot, 
when writing data, we
+                // will parse to doc snapshot

Review Comment:
   每一种状态需要有合理的场景



##########
be/src/vec/json/parse2column.cpp:
##########
@@ -237,4 +274,53 @@ void parse_json_to_variant(IColumn& column, const 
ColumnString& raw_json_column,
     column.finalize();
 }
 
+// pasre the doc snapshot column to subcolumns
+void parse_binary_to_variant(ColumnVariant& column_variant) {

Review Comment:
   maybe rename to `shredding_binary_to_subcolumns` is better



##########
be/src/olap/rowset/segment_v2/variant/variant_util.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/variant/variant_util.h"
+
+#include <glog/logging.h>
+
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/tablet_schema.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_variant.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/schema_util.h"
+#include "vec/data_types/data_type_jsonb.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/json/parse2column.h"
+
+namespace doris::segment_v2::variant_util {
+
+namespace {
+
+Status _parse_variant_columns(vectorized::Block& block, const 
std::vector<uint32_t>& variant_pos,
+                              const std::vector<vectorized::ParseConfig>& 
configs) {
+    for (size_t i = 0; i < variant_pos.size(); ++i) {
+        auto column_ref = block.get_by_position(variant_pos[i]).column;
+        bool is_nullable = column_ref->is_nullable();
+        vectorized::MutableColumnPtr var_column = column_ref->assume_mutable();
+        if (is_nullable) {
+            const auto& nullable = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref);
+            var_column = nullable.get_nested_column_ptr()->assume_mutable();
+        }
+        auto& var = assert_cast<vectorized::ColumnVariant&>(*var_column);
+        var_column->finalize();
+
+        vectorized::MutableColumnPtr variant_column;
+        if (var.is_doc_snapshot_mode()) {
+            // doc snapshot mode, we need to parse the doc snapshot column
+            vectorized::parse_binary_to_variant(var);
+            continue;
+        }
+        if (!var.is_scalar_variant()) {
+            // already parsed
+            continue;
+        }
+
+        VLOG_DEBUG << "parse scalar variant column: " << 
var.get_root_type()->get_name();
+        vectorized::ColumnPtr scalar_root_column;
+        if (var.get_root_type()->get_primitive_type() == TYPE_JSONB) {
+            // TODO more efficient way to parse jsonb type, currently we just 
convert jsonb to
+            // json str and parse them into variant
+            RETURN_IF_ERROR(vectorized::schema_util::cast_column(
+                    {var.get_root(), var.get_root_type(), ""},
+                    var.get_root()->is_nullable()
+                            ? 
make_nullable(std::make_shared<vectorized::DataTypeString>())
+                            : std::make_shared<vectorized::DataTypeString>(),
+                    &scalar_root_column));
+            if (scalar_root_column->is_nullable()) {
+                scalar_root_column =
+                        assert_cast<const 
vectorized::ColumnNullable*>(scalar_root_column.get())
+                                ->get_nested_column_ptr();
+            }
+        } else {
+            const auto& root = *var.get_root();
+            scalar_root_column = root.is_nullable()
+                                         ? assert_cast<const 
vectorized::ColumnNullable&>(root)
+                                                   .get_nested_column_ptr()
+                                         : var.get_root();
+        }
+
+        if (scalar_root_column->is_column_string()) {
+            variant_column = vectorized::ColumnVariant::create(0);
+            vectorized::parse_json_to_variant(
+                    *variant_column.get(),
+                    assert_cast<const 
vectorized::ColumnString&>(*scalar_root_column), configs[i]);
+        } else {
+            // Root maybe other types rather than string like 
ColumnVariant(Int32).
+            // In this case, we should finlize the root and cast to JSON type
+            auto expected_root_type =
+                    
make_nullable(std::make_shared<vectorized::ColumnVariant::MostCommonType>());
+            var.ensure_root_node_type(expected_root_type);
+            variant_column = var.assume_mutable();
+        }
+
+        // Wrap variant with nullmap if it is nullable
+        vectorized::ColumnPtr result = variant_column->get_ptr();
+        if (is_nullable) {
+            const auto& null_map = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref)
+                                           .get_null_map_column_ptr();
+            result = vectorized::ColumnNullable::create(result, null_map);
+        }
+        block.get_by_position(variant_pos[i]).column = result;
+    }
+    return Status::OK();
+}
+
+} // namespace
+
+Status parse_variant_columns(vectorized::Block& block, const 
std::vector<uint32_t>& variant_pos,
+                             const std::vector<vectorized::ParseConfig>& 
configs) {
+    RETURN_IF_CATCH_EXCEPTION({ return _parse_variant_columns(block, 
variant_pos, configs); });
+}
+
+Status parse_variant_columns(vectorized::Block& block, const TabletSchema& 
tablet_schema,
+                             const std::vector<uint32_t>& column_pos) {
+    std::vector<uint32_t> variant_column_pos;
+    for (const auto& pos : column_pos) {
+        const auto& column = tablet_schema.column(pos);
+        if (column.is_variant_type()) {
+            variant_column_pos.push_back(pos);
+        }
+    }
+
+    if (variant_column_pos.empty()) {
+        return Status::OK();
+    }
+
+    std::vector<vectorized::ParseConfig> configs(variant_column_pos.size());
+    for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+        configs[i].enable_flatten_nested = 
tablet_schema.variant_flatten_nested();
+        const auto& column = tablet_schema.column(variant_column_pos[i]);
+        if (column.is_variant_type()) {
+            // enable doc snapshot mode
+            if (column.variant_enable_doc_snapshot_mode()) {

Review Comment:
   line 144-164 is too complicated to understand, the logic is mixed, we need 
to refactor to make the code more clear



##########
be/src/olap/rowset/segment_v2/variant/variant_util.h:
##########
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "vec/json/json_parser.h"
+
+namespace doris {
+class TabletSchema;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::segment_v2::variant_util {
+
+// Parse variant columns by picking variant positions from `column_pos` and 
generating ParseConfig

Review Comment:
    and generating ParseConfig? 



##########
be/src/vec/columns/column_variant.h:
##########
@@ -279,6 +279,9 @@ class ColumnVariant final : public COWHelper<IColumn, 
ColumnVariant> {
     WrappedPtr serialized_sparse_column = ColumnMap::create(
             ColumnString::create(), ColumnString::create(), 
ColumnArray::ColumnOffsets::create());
 
+    WrappedPtr serialized_doc_snapshot_column = ColumnMap::create(

Review Comment:
   why not reuse `serialized_sparse_column` and rename it?



##########
be/src/vec/json/parse2column.cpp:
##########
@@ -165,35 +165,72 @@ void parse_json_to_variant(IColumn& column, const char* 
src, size_t length,
         check_paths.insert(check_paths.end(), paths.begin(), paths.end());
         
THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
     }
-    for (size_t i = 0; i < paths.size(); ++i) {
-        FieldInfo field_info;
-        schema_util::get_field_info(values[i], &field_info);
-        if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
-            continue;
-        }
-        if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
-            if (paths[i].has_nested_part()) {
-                column_variant.add_nested_subcolumn(paths[i], field_info, 
old_num_rows);
-            } else {
-                column_variant.add_sub_column(paths[i], old_num_rows);
+    auto [doc_snapshot_data_paths, doc_snapshot_data_values] =
+            column_variant.get_doc_snapshot_data_paths_and_values();
+    auto& doc_snapshot_data_offsets = 
column_variant.serialized_doc_snapshot_column_offsets();
+    std::unordered_set<std::string> subcolumn_set;
+    if (config.parse_to_subcolumns) {

Review Comment:
   better use enum instead of boolean value and encapsulate the logic into 
function with meaningful name



##########
be/src/vec/columns/column_variant.cpp:
##########
@@ -1496,9 +1567,81 @@ bool ColumnVariant::is_visible_root_value(size_t nrow) 
const {
             return false;
         }
     }
+
+    const auto& doc_snapshot_column_map =
+            assert_cast<const ColumnMap&>(*serialized_doc_snapshot_column);
+    // doc snapshot column is not empty
+    if (doc_snapshot_column_map.get_offsets()[nrow - 1] !=
+        doc_snapshot_column_map.get_offsets()[nrow]) {
+        return false;
+    }
     return !root->data.is_null_at(nrow);
 }
 
+void ColumnVariant::serialize_from_doc_snapshot_to_json_format(int64_t row_num,
+                                                               BufferWritable& 
output,

Review Comment:
   too many duplicated code



##########
be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp:
##########
@@ -830,6 +972,254 @@ Status VariantSubcolumnWriter::append_nullable(const 
uint8_t* null_map, const ui
     return Status::OK();
 }
 
+VariantCompactionDocSnapshotWriter::VariantCompactionDocSnapshotWriter(
+        const ColumnWriterOptions& opts, const TabletColumn* column, 
std::unique_ptr<Field> field)
+        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta) {
+    _opts = opts;
+    _tablet_column = column;
+    _column = vectorized::ColumnVariant::create(0);
+}
+
+Status VariantCompactionDocSnapshotWriter::init() {
+    return Status::OK();
+}
+
+Status VariantCompactionDocSnapshotWriter::append_data(const uint8_t** ptr, 
size_t num_rows) {
+    const auto* column = reinterpret_cast<const 
vectorized::VariantColumnData*>(*ptr);
+    const auto& src = *reinterpret_cast<const 
vectorized::ColumnVariant*>(column->column_data);
+    auto* dst_ptr = assert_cast<vectorized::ColumnVariant*>(_column.get());
+    // TODO: if direct write we could avoid copy
+    dst_ptr->insert_range_from(src, column->row_pos, num_rows);
+    return Status::OK();
+}
+
+Status VariantCompactionDocSnapshotWriter::finish() {
+    if (!is_finalized()) {
+        RETURN_IF_ERROR(finalize());
+    }
+    for (auto& column_writer : _subcolumn_writers) {
+        RETURN_IF_ERROR(column_writer->finish());
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->finish());
+    return Status::OK();
+}
+Status VariantCompactionDocSnapshotWriter::write_data() {
+    if (!is_finalized()) {
+        RETURN_IF_ERROR(finalize());
+    }
+    for (auto& column_writer : _subcolumn_writers) {
+        RETURN_IF_ERROR(column_writer->write_data());
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->write_data());
+    return Status::OK();
+}
+Status VariantCompactionDocSnapshotWriter::write_ordinal_index() {
+    assert(is_finalized());
+    for (auto& column_writer : _subcolumn_writers) {
+        RETURN_IF_ERROR(column_writer->write_ordinal_index());
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->write_ordinal_index());
+    return Status::OK();
+}
+
+Status VariantCompactionDocSnapshotWriter::write_zone_map() {
+    assert(is_finalized());
+    for (int i = 0; i < _subcolumn_writers.size(); ++i) {
+        if (_subcolumn_opts[i].need_zone_map) {

Review Comment:
   too many duplicated code for write_zone_map, write_inverted_index ...



##########
regression-test/suites/variant_p0/ext_meta/test_variant_external_meta_edge_cases.groovy:
##########
@@ -34,7 +34,7 @@ suite("test_variant_external_meta_edge_cases", 
"nonConcurrent") {
     sql """
         CREATE TABLE test_empty_subcolumns (
             k bigint,
-            v variant
+            v variant<properties("variant_enable_doc_snapshot_mode" = "false")>

Review Comment:
   why add this



##########
be/src/olap/tablet_schema.h:
##########
@@ -294,6 +312,12 @@ class TabletColumn : public MetadataAdder<TabletColumn> {
             BeConsts::DEFAULT_VARIANT_MAX_SPARSE_COLUMN_STATS_SIZE;
     // default to 0, no shard
     int32_t _variant_sparse_hash_shard_count = 0;
+

Review Comment:
   better wrap all the  variant related properties to a structure



##########
be/src/vec/json/parse2column.cpp:
##########
@@ -165,35 +165,72 @@ void parse_json_to_variant(IColumn& column, const char* 
src, size_t length,
         check_paths.insert(check_paths.end(), paths.begin(), paths.end());
         
THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
     }
-    for (size_t i = 0; i < paths.size(); ++i) {
-        FieldInfo field_info;
-        schema_util::get_field_info(values[i], &field_info);
-        if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
-            continue;
-        }
-        if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
-            if (paths[i].has_nested_part()) {
-                column_variant.add_nested_subcolumn(paths[i], field_info, 
old_num_rows);
-            } else {
-                column_variant.add_sub_column(paths[i], old_num_rows);
+    auto [doc_snapshot_data_paths, doc_snapshot_data_values] =
+            column_variant.get_doc_snapshot_data_paths_and_values();
+    auto& doc_snapshot_data_offsets = 
column_variant.serialized_doc_snapshot_column_offsets();
+    std::unordered_set<std::string> subcolumn_set;
+    if (config.parse_to_subcolumns) {
+        for (size_t i = 0; i < paths.size(); ++i) {
+            FieldInfo field_info;
+            schema_util::get_field_info(values[i], &field_info);
+            if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
+                continue;
+            }
+            if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
+                if (paths[i].has_nested_part()) {
+                    column_variant.add_nested_subcolumn(paths[i], field_info, 
old_num_rows);
+                } else {
+                    column_variant.add_sub_column(paths[i], old_num_rows);
+                }
+            }
+            auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
+            if (!subcolumn) {
+                throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to 
find sub column {}",
+                                       paths[i].get_path());
+            }
+            if (subcolumn->cur_num_of_defaults() > 0) {
+                
subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
+                subcolumn->reset_current_num_of_defaults();
+            }
+            if (subcolumn->size() != old_num_rows) {
+                throw doris::Exception(
+                        ErrorCode::INVALID_ARGUMENT,
+                        "subcolumn {} size missmatched, may contains 
duplicated entry",
+                        paths[i].get_path());
+            }
+            subcolumn->insert(std::move(values[i]), std::move(field_info));
+            if (subcolumn_set.contains(paths[i].get_path())) {
+                throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                       "may contains duplicated entry : {}", 
paths[i].get_path());
+            }
+            subcolumn_set.insert(paths[i].get_path());
+            if (!paths[i].empty() && config.parse_to_doc_snapshot) {
+                subcolumn->serialize_to_sparse_column(doc_snapshot_data_paths, 
paths[i].get_path(),

Review Comment:
   when will parse_to_doc_snapshot and parse_to_subcolumns both be true?



##########
be/src/olap/tablet_schema.h:
##########
@@ -294,6 +312,12 @@ class TabletColumn : public MetadataAdder<TabletColumn> {
             BeConsts::DEFAULT_VARIANT_MAX_SPARSE_COLUMN_STATS_SIZE;
     // default to 0, no shard
     int32_t _variant_sparse_hash_shard_count = 0;
+
+    bool _variant_enable_doc_snapshot_mode = false;
+
+    int64_t _variant_doc_snapshot_min_rows = 0;
+
+    int32_t _variant_doc_snapshot_shard_count = 128;

Review Comment:
   what if specify _variant_sparse_hash_shard_count and 
_variant_doc_snapshot_shard_count at the same time?



##########
be/src/olap/rowset/segment_v2/variant/variant_util.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/variant/variant_util.h"
+
+#include <glog/logging.h>
+
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/tablet_schema.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_variant.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/schema_util.h"
+#include "vec/data_types/data_type_jsonb.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/json/parse2column.h"
+
+namespace doris::segment_v2::variant_util {
+
+namespace {
+
+Status _parse_variant_columns(vectorized::Block& block, const 
std::vector<uint32_t>& variant_pos,
+                              const std::vector<vectorized::ParseConfig>& 
configs) {
+    for (size_t i = 0; i < variant_pos.size(); ++i) {
+        auto column_ref = block.get_by_position(variant_pos[i]).column;
+        bool is_nullable = column_ref->is_nullable();
+        vectorized::MutableColumnPtr var_column = column_ref->assume_mutable();
+        if (is_nullable) {
+            const auto& nullable = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref);
+            var_column = nullable.get_nested_column_ptr()->assume_mutable();
+        }
+        auto& var = assert_cast<vectorized::ColumnVariant&>(*var_column);
+        var_column->finalize();
+
+        vectorized::MutableColumnPtr variant_column;
+        if (var.is_doc_snapshot_mode()) {
+            // doc snapshot mode, we need to parse the doc snapshot column
+            vectorized::parse_binary_to_variant(var);
+            continue;
+        }
+        if (!var.is_scalar_variant()) {
+            // already parsed
+            continue;
+        }
+
+        VLOG_DEBUG << "parse scalar variant column: " << 
var.get_root_type()->get_name();
+        vectorized::ColumnPtr scalar_root_column;
+        if (var.get_root_type()->get_primitive_type() == TYPE_JSONB) {
+            // TODO more efficient way to parse jsonb type, currently we just 
convert jsonb to
+            // json str and parse them into variant
+            RETURN_IF_ERROR(vectorized::schema_util::cast_column(
+                    {var.get_root(), var.get_root_type(), ""},
+                    var.get_root()->is_nullable()
+                            ? 
make_nullable(std::make_shared<vectorized::DataTypeString>())
+                            : std::make_shared<vectorized::DataTypeString>(),
+                    &scalar_root_column));
+            if (scalar_root_column->is_nullable()) {
+                scalar_root_column =
+                        assert_cast<const 
vectorized::ColumnNullable*>(scalar_root_column.get())
+                                ->get_nested_column_ptr();
+            }
+        } else {
+            const auto& root = *var.get_root();
+            scalar_root_column = root.is_nullable()
+                                         ? assert_cast<const 
vectorized::ColumnNullable&>(root)
+                                                   .get_nested_column_ptr()
+                                         : var.get_root();
+        }
+
+        if (scalar_root_column->is_column_string()) {
+            variant_column = vectorized::ColumnVariant::create(0);
+            vectorized::parse_json_to_variant(
+                    *variant_column.get(),
+                    assert_cast<const 
vectorized::ColumnString&>(*scalar_root_column), configs[i]);
+        } else {
+            // Root maybe other types rather than string like 
ColumnVariant(Int32).
+            // In this case, we should finlize the root and cast to JSON type
+            auto expected_root_type =
+                    
make_nullable(std::make_shared<vectorized::ColumnVariant::MostCommonType>());
+            var.ensure_root_node_type(expected_root_type);
+            variant_column = var.assume_mutable();
+        }
+
+        // Wrap variant with nullmap if it is nullable
+        vectorized::ColumnPtr result = variant_column->get_ptr();
+        if (is_nullable) {
+            const auto& null_map = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref)
+                                           .get_null_map_column_ptr();
+            result = vectorized::ColumnNullable::create(result, null_map);
+        }
+        block.get_by_position(variant_pos[i]).column = result;
+    }
+    return Status::OK();
+}
+
+} // namespace
+
+Status parse_variant_columns(vectorized::Block& block, const 
std::vector<uint32_t>& variant_pos,
+                             const std::vector<vectorized::ParseConfig>& 
configs) {
+    RETURN_IF_CATCH_EXCEPTION({ return _parse_variant_columns(block, 
variant_pos, configs); });
+}
+
+Status parse_variant_columns(vectorized::Block& block, const TabletSchema& 
tablet_schema,
+                             const std::vector<uint32_t>& column_pos) {
+    std::vector<uint32_t> variant_column_pos;
+    for (const auto& pos : column_pos) {
+        const auto& column = tablet_schema.column(pos);
+        if (column.is_variant_type()) {
+            variant_column_pos.push_back(pos);
+        }
+    }
+
+    if (variant_column_pos.empty()) {
+        return Status::OK();
+    }
+
+    std::vector<vectorized::ParseConfig> configs(variant_column_pos.size());
+    for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+        configs[i].enable_flatten_nested = 
tablet_schema.variant_flatten_nested();
+        const auto& column = tablet_schema.column(variant_column_pos[i]);
+        if (column.is_variant_type()) {
+            // enable doc snapshot mode
+            if (column.variant_enable_doc_snapshot_mode()) {
+                // if has schema template, no need to parse to doc snapshot, 
when writing data, we
+                // will parse to doc snapshot

Review Comment:
   这里if else嵌套太深,可读性比较差



##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -2363,6 +2363,8 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
 
             RETURN_IF_ERROR(block->check_type_and_column());
 
+            LOG(INFO) << "block data: " << block->dump_data();

Review Comment:
   remove



##########
be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp:
##########
@@ -366,6 +388,114 @@ Status 
UnifiedSparseColumnWriter::append_from_variant(const vectorized::ColumnVa
     return Status::OK();
 }
 
+Status VariantDocSnapShotWriter::init(const TabletColumn* parent_column, int 
bucket_num,
+                                      int& column_id, const 
ColumnWriterOptions& opts,
+                                      SegmentFooterPB* footer) {
+    _bucket_num = bucket_num;
+    _first_column_id = column_id;
+    _doc_snapshot_column_writers.resize(_bucket_num);
+    _doc_snapshot_column_opts.resize(_bucket_num);
+    for (int b = 0; b < _bucket_num; ++b) {
+        const TabletColumn& bucket_column =
+                
vectorized::schema_util::create_doc_snapshot_column(*parent_column, b);
+        _doc_snapshot_column_opts[b] = opts;
+        _doc_snapshot_column_opts[b].meta = footer->add_columns();
+        _init_column_meta(_doc_snapshot_column_opts[b].meta, column_id, 
bucket_column,
+                          opts.compression_type);
+        
RETURN_IF_ERROR(ColumnWriter::create_map_writer(_doc_snapshot_column_opts[b],
+                                                        &bucket_column, 
opts.file_writer,
+                                                        
&_doc_snapshot_column_writers[b]));
+        RETURN_IF_ERROR(_doc_snapshot_column_writers[b]->init());
+        ++column_id;
+    }
+    return Status::OK();
+}
+
+Status VariantDocSnapShotWriter::append_data(const TabletColumn* parent_column,

Review Comment:
   need more comment, maybe some logic could be reused with 
UnifiedSparseColumnWriter



##########
be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp:
##########
@@ -525,22 +655,27 @@ Status 
VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnVariant* p
     return Status::OK();
 }
 
-// Serialize and write Variant sparse data. Decides mode based on 
FE-configured bucket num,
-// initializes the corresponding writer(s), and delegates 
conversion/append/statistics to the
-// unified sparse writer. Column id consumption order remains identical to the 
previous logic.
 Status VariantColumnWriterImpl::_process_sparse_column(
         vectorized::ColumnVariant* ptr, vectorized::OlapBlockDataConvertor* 
converter,
         size_t num_rows, int& column_id) {
     int bucket_num = std::max(1, 
_tablet_column->variant_sparse_hash_shard_count());
-    if (bucket_num <= 1) {
-        TabletColumn sparse_column = 
vectorized::schema_util::create_sparse_column(*_tablet_column);
-        RETURN_IF_ERROR(_sparse_writer.init_single(sparse_column, column_id, 
_opts, _opts.footer));
-    } else {
-        RETURN_IF_ERROR(_sparse_writer.init_buckets(bucket_num, 
*_tablet_column, column_id, _opts,
-                                                    _opts.footer));
+    RETURN_IF_ERROR(
+            _sparse_writer.init(_tablet_column, bucket_num, column_id, _opts, 
_opts.footer));
+    RETURN_IF_ERROR(_sparse_writer.append_data(_tablet_column, *ptr, num_rows, 
converter));
+    return Status::OK();
+}
+
+Status VariantColumnWriterImpl::_process_doc_snapshot_column(
+        vectorized::ColumnVariant* ptr, vectorized::OlapBlockDataConvertor* 
converter,
+        size_t num_rows, int& column_id) {
+    if (!_tablet_column->variant_enable_doc_snapshot_mode()) {
+        return Status::OK();
     }
-    RETURN_IF_ERROR(_sparse_writer.append_from_variant(*ptr, num_rows, 
converter, *_tablet_column,
-                                                       &_statistics));
+    ptr->reconstruct_doc_snapshot_column();

Review Comment:
   why need to `reconstruct_doc_snapshot_column` ? schema change ?  better add 
comment



##########
be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp:
##########
@@ -366,6 +388,114 @@ Status 
UnifiedSparseColumnWriter::append_from_variant(const vectorized::ColumnVa
     return Status::OK();
 }
 
+Status VariantDocSnapShotWriter::init(const TabletColumn* parent_column, int 
bucket_num,

Review Comment:
   VariantDocumentWriter, 加上snapshot太难理解了,然后加注释描述一下document的概念



##########
fe/fe-common/src/main/java/org/apache/doris/catalog/VariantType.java:
##########
@@ -52,6 +52,15 @@ public class VariantType extends ScalarType {
     @SerializedName(value = "variantSparseHashShardCount")
     private final int variantSparseHashShardCount;
 
+    @SerializedName(value = "enableVariantDocSnapshotMode")

Review Comment:
   enable_doc_mode?



##########
be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp:
##########
@@ -710,6 +735,18 @@ Status VariantColumnReader::_build_read_plan(ReadPlan* 
plan, const TabletColumn&
             plan->root = root;
             return Status::OK();
         }
+
+        // find if path exists in doc snapshot column
+        auto picked_doc_snapshot_buckets =
+                _pick_doc_snapshot_column_buckets(relative_path.get_path());
+        if (!picked_doc_snapshot_buckets.empty()) {
+            plan->kind = ReadKind::DOC_SNAPSHOT_EXTRACT;
+            plan->type = create_variant_type(target_col);

Review Comment:
   应该把extract和层级读分开实现, 读root实际上也是层级读(所有bucket的特例)。extract可以做一些特殊的优化



##########
be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp:
##########
@@ -776,21 +813,56 @@ Status VariantColumnReader::_create_iterator_from_plan(
         }
         return Status::OK();
     }
+    case ReadKind::DOC_SNAPSHOT: {
+        DCHECK(plan.doc_snapshot_buckets.size() == 1);
+        ColumnIteratorUPtr inner_iter;
+        
RETURN_IF_ERROR(_doc_snapshot_column_readers.at(plan.doc_snapshot_buckets[0])
+                                ->new_iterator(&inner_iter, nullptr));
+        *iterator = 
std::make_unique<VariantDocSnapshotCompactIterator>(std::move(inner_iter));
+        return Status::OK();
+    }
+    case ReadKind::DOC_SNAPSHOT_EXTRACT: {
+        DCHECK(plan.doc_snapshot_buckets.size() >= 1);
+
+        std::vector<BinaryColumnCacheSPtr> doc_snapshot_column_caches;
+        for (const auto& bucket : plan.doc_snapshot_buckets) {
+            std::string path = DOC_SNAPSHOT_COLUMN_PATH + "." + 
std::to_string(bucket);
+            BinaryColumnCacheSPtr doc_snapshot_column_cache = 
DORIS_TRY(_get_binary_column_cache(
+                    binary_column_cache_ptr, path, 
_doc_snapshot_column_readers.at(bucket)));
+            
doc_snapshot_column_caches.push_back(std::move(doc_snapshot_column_cache));
+        }
+        *iterator = std::make_unique<VariantDocSnapshotPathIterator>(

Review Comment:
   应该把extract和层级读分开实现, 读root实际上也是层级读(所有bucket的特例)。extract可以做一些特殊的优化



##########
be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp:
##########
@@ -710,6 +735,18 @@ Status VariantColumnReader::_build_read_plan(ReadPlan* 
plan, const TabletColumn&
             plan->root = root;
             return Status::OK();
         }
+
+        // find if path exists in doc snapshot column
+        auto picked_doc_snapshot_buckets =
+                _pick_doc_snapshot_column_buckets(relative_path.get_path());
+        if (!picked_doc_snapshot_buckets.empty()) {
+            plan->kind = ReadKind::DOC_SNAPSHOT_EXTRACT;

Review Comment:
   
这里相当于是merge的方式去merge不同bucket的前缀而不是extract,加上注释和example,extract可能不合适,叫DOC_BUILD_PATH?



##########
be/src/vec/json/parse2column.cpp:
##########
@@ -165,35 +165,77 @@ void parse_json_to_variant(IColumn& column, const char* 
src, size_t length,
         check_paths.insert(check_paths.end(), paths.begin(), paths.end());
         
THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
     }
-    for (size_t i = 0; i < paths.size(); ++i) {
-        FieldInfo field_info;
-        schema_util::get_field_info(values[i], &field_info);
-        if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
-            continue;
-        }
-        if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
-            if (paths[i].has_nested_part()) {
-                column_variant.add_nested_subcolumn(paths[i], field_info, 
old_num_rows);
-            } else {
-                column_variant.add_sub_column(paths[i], old_num_rows);
+    auto [doc_snapshot_data_paths, doc_snapshot_data_values] =
+            column_variant.get_doc_snapshot_data_paths_and_values();
+    auto& doc_snapshot_data_offsets = 
column_variant.serialized_doc_snapshot_column_offsets();
+    std::unordered_set<std::string> subcolumn_set;
+    if (config.parse_to_subcolumns) {
+        for (size_t i = 0; i < paths.size(); ++i) {
+            FieldInfo field_info;
+            schema_util::get_field_info(values[i], &field_info);
+            if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
+                continue;
+            }
+            if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
+                if (paths[i].has_nested_part()) {
+                    column_variant.add_nested_subcolumn(paths[i], field_info, 
old_num_rows);
+                } else {
+                    column_variant.add_sub_column(paths[i], old_num_rows);
+                }
+            }
+            auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
+            if (!subcolumn) {
+                throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to 
find sub column {}",
+                                       paths[i].get_path());
+            }
+            if (subcolumn->cur_num_of_defaults() > 0) {
+                
subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
+                subcolumn->reset_current_num_of_defaults();
+            }
+            if (subcolumn->size() != old_num_rows) {
+                throw doris::Exception(
+                        ErrorCode::INVALID_ARGUMENT,
+                        "subcolumn {} size missmatched, may contains 
duplicated entry",
+                        paths[i].get_path());
+            }
+            subcolumn->insert(std::move(values[i]), std::move(field_info));
+            if (subcolumn_set.contains(paths[i].get_path())) {
+                throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                       "may contains duplicated entry : {}", 
paths[i].get_path());
+            }
+            subcolumn_set.insert(paths[i].get_path());
+            if (!paths[i].empty() && config.parse_to_doc_snapshot) {
+                subcolumn->serialize_to_binary_column(doc_snapshot_data_paths, 
paths[i].get_path(),
+                                                      
doc_snapshot_data_values, old_num_rows);
             }
         }
-        auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
-        if (!subcolumn) {
-            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to 
find sub column {}",
-                                   paths[i].get_path());
-        }
-        if (subcolumn->cur_num_of_defaults() > 0) {
-            subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
-            subcolumn->reset_current_num_of_defaults();
-        }
-        if (subcolumn->size() != old_num_rows) {
-            throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
-                                   "subcolumn {} size missmatched, may 
contains duplicated entry",
-                                   paths[i].get_path());
+    } else {
+        CHECK(config.parse_to_doc_snapshot);
+        for (size_t i = 0; i < paths.size(); ++i) {
+            FieldInfo field_info;
+            schema_util::get_field_info(values[i], &field_info);
+            if (subcolumn_set.contains(paths[i].get_path())) {
+                throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
+                                       "may contains duplicated entry : {}", 
paths[i].get_path());
+            }
+            subcolumn_set.insert(paths[i].get_path());
+            if (paths[i].empty()) {
+                auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
+                DCHECK(subcolumn != nullptr);
+                if (subcolumn->cur_num_of_defaults() > 0) {
+                    
subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
+                    subcolumn->reset_current_num_of_defaults();
+                }
+                subcolumn->insert(std::move(values[i]), std::move(field_info));
+                continue;
+            }
+            ColumnVariant::Subcolumn tmp_subcolumn(0, true);

Review Comment:
   need to reuse tmp_subcolumn



##########
fe/fe-common/src/main/java/org/apache/doris/catalog/VariantType.java:
##########
@@ -52,6 +52,15 @@ public class VariantType extends ScalarType {
     @SerializedName(value = "variantSparseHashShardCount")
     private final int variantSparseHashShardCount;
 
+    @SerializedName(value = "enableVariantDocSnapshotMode")
+    private final boolean enableVariantDocSnapshotMode;
+
+    @SerializedName(value = "variantDocSnapshotMinRows")

Review Comment:
   doc_shredding_min_rows?



##########
be/src/olap/rowset/segment_v2/variant/variant_doc_snapshot_iterator.cpp:
##########
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/variant/variant_doc_snapshot_iterator.h"
+
+#include <algorithm>
+#include <tuple>
+#include <unordered_map>
+#include <utility>
+
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/string_ref.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/json/path_in_data.h"
+
+namespace doris::segment_v2 {
+
+#include "common/compile_check_begin.h"
+
+VariantDocSnapshotIteratorBase::VariantDocSnapshotIteratorBase(
+        std::vector<BinaryColumnCacheSPtr>&& doc_snapshot_column_caches)
+        : _doc_snapshot_column_caches(std::move(doc_snapshot_column_caches)) {}
+
+Status VariantDocSnapshotIteratorBase::init(const ColumnIteratorOptions& opts) 
{
+    for (const auto& cache : _doc_snapshot_column_caches) {
+        RETURN_IF_ERROR(cache->init(opts));
+    }
+    return Status::OK();
+}
+
+Status VariantDocSnapshotIteratorBase::seek_to_ordinal(ordinal_t ord) {
+    for (const auto& cache : _doc_snapshot_column_caches) {
+        RETURN_IF_ERROR(cache->seek_to_ordinal(ord));
+    }
+    return Status::OK();
+}
+
+ordinal_t VariantDocSnapshotIteratorBase::get_current_ordinal() const {
+    DCHECK(!_doc_snapshot_column_caches.empty());
+    return _doc_snapshot_column_caches[0]->get_current_ordinal();
+}
+
+VariantDocSnapshotRootIterator::VariantDocSnapshotRootIterator(
+        std::vector<BinaryColumnCacheSPtr>&& doc_snapshot_column_caches,
+        std::unique_ptr<SubstreamIterator>&& root_reader)
+        : 
VariantDocSnapshotIteratorBase(std::move(doc_snapshot_column_caches)),
+          _root_reader(std::move(root_reader)) {}
+
+Status VariantDocSnapshotRootIterator::init(const ColumnIteratorOptions& opts) 
{
+    RETURN_IF_ERROR(VariantDocSnapshotIteratorBase::init(opts));
+    DCHECK(_root_reader);
+    RETURN_IF_ERROR(_root_reader->iterator->init(opts));
+    return Status::OK();
+}
+
+Status VariantDocSnapshotRootIterator::seek_to_ordinal(ordinal_t ord) {
+    RETURN_IF_ERROR(VariantDocSnapshotIteratorBase::seek_to_ordinal(ord));
+    DCHECK(_root_reader);
+    RETURN_IF_ERROR(_root_reader->iterator->seek_to_ordinal(ord));
+    return Status::OK();
+}
+
+Status VariantDocSnapshotRootIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
+                                                  bool* has_null) {
+    DCHECK(_root_reader);
+    RETURN_IF_ERROR(_root_reader->iterator->next_batch(n, 
_root_reader->column, has_null));
+    std::vector<vectorized::ColumnPtr> doc_snapshot_data_buckets;
+    RETURN_IF_ERROR(_collect_doc_snapshot_data(
+            [&](BinaryColumnCache* cache) { return cache->next_batch(n, 
has_null); },
+            &doc_snapshot_data_buckets));
+
+    size_t num_rows = doc_snapshot_data_buckets.empty() ? 0 : 
doc_snapshot_data_buckets[0]->size();
+    if (n != nullptr) {
+        *n = num_rows;
+    }
+    RETURN_IF_ERROR(_merge_doc_snapshot_into_variant(dst, 
doc_snapshot_data_buckets, num_rows));
+    _root_reader->column->clear();
+    return Status::OK();
+}
+
+Status VariantDocSnapshotRootIterator::read_by_rowids(const rowid_t* rowids, 
const size_t count,
+                                                      
vectorized::MutableColumnPtr& dst) {
+    DCHECK(_root_reader);
+    RETURN_IF_ERROR(_root_reader->iterator->read_by_rowids(rowids, count, 
_root_reader->column));
+    std::vector<vectorized::ColumnPtr> doc_snapshot_data_buckets;
+    RETURN_IF_ERROR(_collect_doc_snapshot_data(
+            [&](BinaryColumnCache* cache) { return 
cache->read_by_rowids(rowids, count); },
+            &doc_snapshot_data_buckets));
+
+    size_t num_rows = doc_snapshot_data_buckets.empty() ? 0 : 
doc_snapshot_data_buckets[0]->size();
+    RETURN_IF_ERROR(_merge_doc_snapshot_into_variant(dst, 
doc_snapshot_data_buckets, num_rows));
+    _root_reader->column->clear();
+    return Status::OK();
+}
+
+Status VariantDocSnapshotRootIterator::_merge_doc_snapshot_into_variant(
+        vectorized::MutableColumnPtr& dst,
+        const std::vector<vectorized::ColumnPtr>& doc_snapshot_data_buckets,
+        size_t num_rows) const {
+    using namespace vectorized;
+    ColumnNullable* nullable_column = nullptr;
+    if (dst->is_nullable()) {
+        nullable_column = assert_cast<ColumnNullable*>(dst.get());
+    }
+    auto& variant = nullable_column == nullptr
+                            ? assert_cast<ColumnVariant&>(*dst)
+                            : 
assert_cast<ColumnVariant&>(nullable_column->get_nested_column());
+    NullMap* null_map = nullable_column ? 
&nullable_column->get_null_map_data() : nullptr;
+
+    std::vector<const ColumnString*> src_doc_snapshot_data_paths_buckets(
+            doc_snapshot_data_buckets.size());
+    std::vector<const ColumnString*> src_doc_snapshot_data_values_buckets(
+            doc_snapshot_data_buckets.size());
+    std::vector<const ColumnArray::Offsets64*> 
src_doc_snapshot_data_offsets_buckets(
+            doc_snapshot_data_buckets.size());
+    for (size_t i = 0; i != doc_snapshot_data_buckets.size(); ++i) {
+        const auto& src_map =
+                assert_cast<const 
vectorized::ColumnMap&>(*doc_snapshot_data_buckets[i]);
+        src_doc_snapshot_data_paths_buckets[i] =
+                assert_cast<const 
vectorized::ColumnString*>(&src_map.get_keys());
+        src_doc_snapshot_data_values_buckets[i] =
+                assert_cast<const 
vectorized::ColumnString*>(&src_map.get_values());
+        src_doc_snapshot_data_offsets_buckets[i] =
+                assert_cast<const 
vectorized::ColumnArray::Offsets64*>(&src_map.get_offsets());
+    }
+
+    CHECK(_root_reader);
+    MutableColumnPtr root_column = _root_reader->column->get_ptr();
+    DCHECK(root_column->size() == num_rows);
+    auto root_nullable_column = make_nullable(root_column->get_ptr());
+    auto root_type = make_nullable(_root_reader->type);
+    MutableColumnPtr container = 
ColumnVariant::create(variant.max_subcolumns_count(), root_type,
+                                                       
root_nullable_column->assume_mutable());
+    auto& container_variant = assert_cast<ColumnVariant&>(*container);
+    vectorized::MutableColumnPtr doc_snapshot_column =
+            vectorized::ColumnVariant::create_binary_column_fn();
+
+    auto& column_map = assert_cast<ColumnMap&>(*doc_snapshot_column);
+    auto& dst_doc_snapshot_data_paths =
+            assert_cast<vectorized::ColumnString&>(column_map.get_keys());
+    auto& dst_doc_snapshot_data_values =
+            assert_cast<vectorized::ColumnString&>(column_map.get_values());
+    auto& dst_doc_snapshot_data_offsets =
+            
assert_cast<vectorized::ColumnArray::Offsets64&>(column_map.get_offsets());
+    for (size_t i = 0; i != num_rows; ++i) {
+        std::vector<std::tuple<std::string_view, size_t, size_t>> all_paths;

Review Comment:
   all_paths move before line 162 and reuse it



##########
be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.h:
##########
@@ -148,9 +176,7 @@ class VariantColumnWriterImpl {
     UnifiedSparseColumnWriter _sparse_writer;
     std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers;
     std::vector<ColumnWriterOptions> _subcolumn_opts;
-
-    // staticstics which will be persisted in the footer
-    VariantStatistics _statistics;
+    VariantDocSnapShotWriter _doc_snapshot_writer;

Review Comment:
   maybe unifiy `_sparse_writer` and `_doc_snapshot_writer` to 
_binary_writer(VariantBinaryWriter), the two structures are exclusive



##########
be/src/olap/rowset/segment_v2/variant/variant_util.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/variant/variant_util.h"
+
+#include <glog/logging.h>
+
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/tablet_schema.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_variant.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/schema_util.h"
+#include "vec/data_types/data_type_jsonb.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/json/parse2column.h"
+
+namespace doris::segment_v2::variant_util {
+
+namespace {
+
+Status _parse_variant_columns(vectorized::Block& block, const 
std::vector<uint32_t>& variant_pos,
+                              const std::vector<vectorized::ParseConfig>& 
configs) {
+    for (size_t i = 0; i < variant_pos.size(); ++i) {
+        auto column_ref = block.get_by_position(variant_pos[i]).column;
+        bool is_nullable = column_ref->is_nullable();
+        vectorized::MutableColumnPtr var_column = column_ref->assume_mutable();
+        if (is_nullable) {
+            const auto& nullable = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref);
+            var_column = nullable.get_nested_column_ptr()->assume_mutable();
+        }
+        auto& var = assert_cast<vectorized::ColumnVariant&>(*var_column);
+        var_column->finalize();
+
+        vectorized::MutableColumnPtr variant_column;
+        if (var.is_doc_snapshot_mode()) {
+            // doc snapshot mode, we need to parse the doc snapshot column

Review Comment:
   什么情况会走到这里, need comments



##########
be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.h:
##########
@@ -148,9 +176,7 @@ class VariantColumnWriterImpl {
     UnifiedSparseColumnWriter _sparse_writer;
     std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers;
     std::vector<ColumnWriterOptions> _subcolumn_opts;
-
-    // staticstics which will be persisted in the footer
-    VariantStatistics _statistics;
+    VariantDocSnapShotWriter _doc_snapshot_writer;

Review Comment:
   _process_doc_snapshot_column and _process_sparse_column could be unified too



##########
be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp:
##########
@@ -830,6 +972,254 @@ Status VariantSubcolumnWriter::append_nullable(const 
uint8_t* null_map, const ui
     return Status::OK();
 }
 
+VariantCompactionDocSnapshotWriter::VariantCompactionDocSnapshotWriter(
+        const ColumnWriterOptions& opts, const TabletColumn* column, 
std::unique_ptr<Field> field)
+        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta) {
+    _opts = opts;
+    _tablet_column = column;
+    _column = vectorized::ColumnVariant::create(0);
+}
+
+Status VariantCompactionDocSnapshotWriter::init() {
+    return Status::OK();
+}
+
+Status VariantCompactionDocSnapshotWriter::append_data(const uint8_t** ptr, 
size_t num_rows) {
+    const auto* column = reinterpret_cast<const 
vectorized::VariantColumnData*>(*ptr);
+    const auto& src = *reinterpret_cast<const 
vectorized::ColumnVariant*>(column->column_data);
+    auto* dst_ptr = assert_cast<vectorized::ColumnVariant*>(_column.get());
+    // TODO: if direct write we could avoid copy
+    dst_ptr->insert_range_from(src, column->row_pos, num_rows);
+    return Status::OK();
+}
+
+Status VariantCompactionDocSnapshotWriter::finish() {
+    if (!is_finalized()) {
+        RETURN_IF_ERROR(finalize());
+    }
+    for (auto& column_writer : _subcolumn_writers) {
+        RETURN_IF_ERROR(column_writer->finish());
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->finish());
+    return Status::OK();
+}
+Status VariantCompactionDocSnapshotWriter::write_data() {
+    if (!is_finalized()) {
+        RETURN_IF_ERROR(finalize());
+    }
+    for (auto& column_writer : _subcolumn_writers) {
+        RETURN_IF_ERROR(column_writer->write_data());
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->write_data());
+    return Status::OK();
+}
+Status VariantCompactionDocSnapshotWriter::write_ordinal_index() {
+    assert(is_finalized());
+    for (auto& column_writer : _subcolumn_writers) {
+        RETURN_IF_ERROR(column_writer->write_ordinal_index());
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->write_ordinal_index());
+    return Status::OK();
+}
+
+Status VariantCompactionDocSnapshotWriter::write_zone_map() {
+    assert(is_finalized());
+    for (int i = 0; i < _subcolumn_writers.size(); ++i) {
+        if (_subcolumn_opts[i].need_zone_map) {
+            RETURN_IF_ERROR(_subcolumn_writers[i]->write_zone_map());
+        }
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->write_zone_map());
+
+    return Status::OK();
+}
+Status VariantCompactionDocSnapshotWriter::write_inverted_index() {
+    assert(is_finalized());
+    for (int i = 0; i < _subcolumn_writers.size(); ++i) {
+        if (_subcolumn_opts[i].need_inverted_index) {
+            RETURN_IF_ERROR(_subcolumn_writers[i]->write_inverted_index());
+        }
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->write_inverted_index());
+    return Status::OK();
+}
+Status VariantCompactionDocSnapshotWriter::write_bloom_filter_index() {
+    assert(is_finalized());
+    for (int i = 0; i < _subcolumn_writers.size(); ++i) {
+        if (_subcolumn_opts[i].need_bloom_filter) {
+            RETURN_IF_ERROR(_subcolumn_writers[i]->write_bloom_filter_index());
+        }
+    }
+    RETURN_IF_ERROR(_doc_snapshot_column_writer->write_bloom_filter_index());
+    return Status::OK();
+}
+Status VariantCompactionDocSnapshotWriter::append_nullable(const uint8_t* 
null_map,
+                                                           const uint8_t** 
ptr, size_t num_rows) {
+    RETURN_IF_ERROR(append_data(ptr, num_rows));
+    return Status::OK();
+}
+Status VariantCompactionDocSnapshotWriter::finalize() {
+    auto* variant_column = 
assert_cast<vectorized::ColumnVariant*>(_column.get());
+
+    const auto& parent_column =
+            
_opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id());
+
+    size_t num_rows = variant_column->size();
+    auto converter = std::make_unique<vectorized::OlapBlockDataConvertor>();
+    int column_id = 0;
+    int64_t variant_doc_snapshot_min_rows = 
parent_column.variant_doc_snapshot_min_rows();
+    if (variant_doc_snapshot_min_rows == 0 ||

Review Comment:
   `variant_doc_snapshot_min_rows == 0` is redundant and could be removed ?



##########
be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp:
##########
@@ -776,21 +813,56 @@ Status VariantColumnReader::_create_iterator_from_plan(
         }
         return Status::OK();
     }
+    case ReadKind::DOC_SNAPSHOT: {
+        DCHECK(plan.doc_snapshot_buckets.size() == 1);
+        ColumnIteratorUPtr inner_iter;
+        
RETURN_IF_ERROR(_doc_snapshot_column_readers.at(plan.doc_snapshot_buckets[0])
+                                ->new_iterator(&inner_iter, nullptr));
+        *iterator = 
std::make_unique<VariantDocSnapshotCompactIterator>(std::move(inner_iter));
+        return Status::OK();
+    }
+    case ReadKind::DOC_SNAPSHOT_EXTRACT: {
+        DCHECK(plan.doc_snapshot_buckets.size() >= 1);
+
+        std::vector<BinaryColumnCacheSPtr> doc_snapshot_column_caches;
+        for (const auto& bucket : plan.doc_snapshot_buckets) {
+            std::string path = DOC_SNAPSHOT_COLUMN_PATH + "." + 
std::to_string(bucket);
+            BinaryColumnCacheSPtr doc_snapshot_column_cache = 
DORIS_TRY(_get_binary_column_cache(
+                    binary_column_cache_ptr, path, 
_doc_snapshot_column_readers.at(bucket)));
+            
doc_snapshot_column_caches.push_back(std::move(doc_snapshot_column_cache));
+        }
+        *iterator = std::make_unique<VariantDocSnapshotPathIterator>(

Review Comment:
   或许extract能跟spase的extract iterator用同一套实现,这样就不用两套代码,优化也更容易做



##########
fe/fe-common/src/main/java/org/apache/doris/catalog/VariantType.java:
##########
@@ -52,6 +52,15 @@ public class VariantType extends ScalarType {
     @SerializedName(value = "variantSparseHashShardCount")
     private final int variantSparseHashShardCount;
 
+    @SerializedName(value = "enableVariantDocSnapshotMode")
+    private final boolean enableVariantDocSnapshotMode;
+
+    @SerializedName(value = "variantDocSnapshotMinRows")
+    private final long variantDocSnapshotMinRows;
+
+    @SerializedName(value = "variantDocSnapshotShardCount")

Review Comment:
   doc_hash_shard_count?



##########
be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp:
##########
@@ -776,21 +813,56 @@ Status VariantColumnReader::_create_iterator_from_plan(
         }
         return Status::OK();
     }
+    case ReadKind::DOC_SNAPSHOT: {
+        DCHECK(plan.doc_snapshot_buckets.size() == 1);
+        ColumnIteratorUPtr inner_iter;
+        
RETURN_IF_ERROR(_doc_snapshot_column_readers.at(plan.doc_snapshot_buckets[0])
+                                ->new_iterator(&inner_iter, nullptr));
+        *iterator = 
std::make_unique<VariantDocSnapshotCompactIterator>(std::move(inner_iter));
+        return Status::OK();
+    }
+    case ReadKind::DOC_SNAPSHOT_EXTRACT: {
+        DCHECK(plan.doc_snapshot_buckets.size() >= 1);
+
+        std::vector<BinaryColumnCacheSPtr> doc_snapshot_column_caches;
+        for (const auto& bucket : plan.doc_snapshot_buckets) {
+            std::string path = DOC_SNAPSHOT_COLUMN_PATH + "." + 
std::to_string(bucket);
+            BinaryColumnCacheSPtr doc_snapshot_column_cache = 
DORIS_TRY(_get_binary_column_cache(
+                    binary_column_cache_ptr, path, 
_doc_snapshot_column_readers.at(bucket)));
+            
doc_snapshot_column_caches.push_back(std::move(doc_snapshot_column_cache));
+        }
+        *iterator = std::make_unique<VariantDocSnapshotPathIterator>(

Review Comment:
   所以就只用单独实现层级读doc列,统一extract实现,都是binary extract read



##########
be/src/vec/columns/column_variant.h:
##########
@@ -279,6 +279,9 @@ class ColumnVariant final : public COWHelper<IColumn, 
ColumnVariant> {
     WrappedPtr serialized_sparse_column = ColumnMap::create(
             ColumnString::create(), ColumnString::create(), 
ColumnArray::ColumnOffsets::create());
 
+    WrappedPtr serialized_doc_snapshot_column = ColumnMap::create(

Review Comment:
   maybe rename to `unshredded_binary_value` is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to