This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 217eac790bf [pick](Variant) pick some refactor and fix #34925 #36317 
#36201 #36793 (#37526)
217eac790bf is described below

commit 217eac790bfef127d2be80d47b05714a1941cd6b
Author: lihangyu <[email protected]>
AuthorDate: Thu Jul 11 21:25:34 2024 +0800

    [pick](Variant) pick some refactor and fix #34925 #36317 #36201 #36793 
(#37526)
---
 be/src/olap/base_tablet.cpp                        |   2 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          |  16 +-
 be/src/olap/rowset/rowset.cpp                      |  16 ++
 be/src/olap/rowset/rowset.h                        |   2 +
 be/src/olap/rowset/rowset_meta.cpp                 |   5 +
 be/src/olap/rowset/rowset_writer_context.h         |   3 +-
 be/src/olap/rowset/segment_creator.cpp             |  96 ++------
 be/src/olap/rowset/segment_creator.h               |   8 +-
 .../rowset/segment_v2/hierarchical_data_reader.cpp |   5 +-
 .../rowset/segment_v2/hierarchical_data_reader.h   |  29 +--
 be/src/olap/rowset/segment_v2/segment.cpp          |  29 +--
 .../rowset/segment_v2/vertical_segment_writer.cpp  | 148 ++++++++++++-
 .../rowset/segment_v2/vertical_segment_writer.h    |  11 +-
 be/src/olap/rowset_builder.cpp                     |  14 +-
 be/src/olap/schema_change.cpp                      |   3 +-
 be/src/olap/tablet.cpp                             |  13 +-
 be/src/olap/tablet_schema.cpp                      |   9 +-
 be/src/olap/tablet_schema.h                        |   2 +-
 be/src/vec/columns/column_object.cpp               |  19 +-
 be/src/vec/columns/column_object.h                 |   8 -
 be/src/vec/common/schema_util.cpp                  | 246 +++++----------------
 be/src/vec/common/schema_util.h                    |  21 +-
 be/src/vec/data_types/data_type.h                  |   4 +
 .../data_types/serde/data_type_object_serde.cpp    |  33 ++-
 .../vec/data_types/serde/data_type_object_serde.h  |   9 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |   2 +-
 be/src/vec/functions/function_variant_element.cpp  |  43 +++-
 be/src/vec/olap/olap_data_convertor.cpp            |  34 ++-
 be/src/vec/olap/olap_data_convertor.h              |  15 +-
 .../java/org/apache/doris/analysis/UpdateStmt.java |  11 +-
 .../trees/plans/commands/UpdateCommand.java        |   2 +-
 regression-test/data/variant_p0/delete_update.out  |  11 +-
 .../data/variant_p0/partial_update_parallel1.csv   |   5 +
 .../data/variant_p0/partial_update_parallel2.csv   |   5 +
 .../data/variant_p0/partial_update_parallel3.csv   |   5 +
 .../data/variant_p0/partial_update_parallel4.csv   |   3 +
 .../data/variant_p0/variant_with_rowstore.out      |   9 +
 .../variant_github_events_p0_new/load.groovy       |  30 +++
 .../suites/variant_p0/delete_update.groovy         | 124 +++++++++--
 .../variant_p0/test_compaction_extract_root.groovy |  12 +-
 .../suites/variant_p0/variant_with_rowstore.groovy |  47 +++-
 41 files changed, 678 insertions(+), 431 deletions(-)

diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 8901aee0e5a..c67c83bbec8 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -79,7 +79,7 @@ Status BaseTablet::update_by_least_common_schema(const 
TabletSchemaSPtr& update_
             {_max_version_schema, update_schema}, _max_version_schema, 
final_schema,
             check_column_size));
     _max_version_schema = final_schema;
-    VLOG_DEBUG << "dump updated tablet schema: " << 
final_schema->dump_structure();
+    VLOG_DEBUG << "dump updated tablet schema: " << 
final_schema->dump_full_schema();
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index de051eea45e..fd09f8b0a7e 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -600,13 +600,12 @@ Status BaseBetaRowsetWriter::build(RowsetSharedPtr& 
rowset) {
     }
 
     // update rowset meta tablet schema if tablet schema updated
-    if (_context.tablet_schema->num_variant_columns() > 0) {
-        _rowset_meta->set_tablet_schema(_context.tablet_schema);
-    }
+    auto rowset_schema = _context.merged_tablet_schema != nullptr ? 
_context.merged_tablet_schema
+                                                                  : 
_context.tablet_schema;
+    _rowset_meta->set_tablet_schema(rowset_schema);
 
     RETURN_NOT_OK_STATUS_WITH_WARN(
-            RowsetFactory::create_rowset(_context.tablet_schema, 
_context.rowset_dir, _rowset_meta,
-                                         &rowset),
+            RowsetFactory::create_rowset(rowset_schema, _context.rowset_dir, 
_rowset_meta, &rowset),
             "rowset init failed when build new rowset");
     _already_built = true;
     return Status::OK();
@@ -627,14 +626,17 @@ int64_t BetaRowsetWriter::_num_seg() const {
 void BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) 
{
     std::lock_guard<std::mutex> lock(*(_context.schema_lock));
     TabletSchemaSPtr update_schema;
+    if (_context.merged_tablet_schema == nullptr) {
+        _context.merged_tablet_schema = _context.tablet_schema;
+    }
     static_cast<void>(vectorized::schema_util::get_least_common_schema(
-            {_context.tablet_schema, flush_schema}, nullptr, update_schema));
+            {_context.merged_tablet_schema, flush_schema}, nullptr, 
update_schema));
     CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
             << "Rowset merge schema columns count is " << 
update_schema->num_columns()
             << ", but flush_schema is larger " << flush_schema->num_columns()
             << " update_schema: " << update_schema->dump_structure()
             << " flush_schema: " << flush_schema->dump_structure();
-    _context.tablet_schema.swap(update_schema);
+    _context.merged_tablet_schema.swap(update_schema);
     VLOG_DEBUG << "dump rs schema: " << 
_context.tablet_schema->dump_structure();
 }
 
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index b5b68f4d38e..208d05f456c 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -23,6 +23,7 @@
 #include "olap/segment_loader.h"
 #include "olap/tablet_schema.h"
 #include "util/time.h"
+#include "vec/common/schema_util.h"
 
 namespace doris {
 
@@ -107,6 +108,21 @@ void Rowset::merge_rowset_meta(const RowsetMetaSharedPtr& 
other) {
     for (auto key_bound : key_bounds) {
         _rowset_meta->add_segment_key_bounds(key_bound);
     }
+
+    // In partial update the rowset schema maybe updated when table contains 
variant type, so we need the newest schema to be updated
+    // Otherwise the schema is stale and lead to wrong data read
+    if (tablet_schema()->num_variant_columns() > 0) {
+        // merge extracted columns
+        TabletSchemaSPtr merged_schema;
+        static_cast<void>(vectorized::schema_util::get_least_common_schema(
+                {tablet_schema(), other->tablet_schema()}, nullptr, 
merged_schema));
+        if (*_schema != *merged_schema) {
+            _rowset_meta->set_tablet_schema(merged_schema);
+        }
+        // rowset->meta_meta()->tablet_schema() maybe updated so make sure 
_schema is
+        // consistent with rowset meta
+        _schema = _rowset_meta->tablet_schema();
+    }
 }
 
 void Rowset::clear_cache() {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 87cfe0b0bea..72c6c2fa29b 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -132,6 +132,8 @@ public:
 
     const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }
 
+    void merge_rowset_meta(const RowsetMeta& other);
+
     bool is_pending() const { return _is_pending; }
 
     bool is_local() const { return _rowset_meta->is_local(); }
diff --git a/be/src/olap/rowset/rowset_meta.cpp 
b/be/src/olap/rowset/rowset_meta.cpp
index d8ef2e7b5dd..d37f5757064 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -17,6 +17,10 @@
 
 #include "olap/rowset/rowset_meta.h"
 
+#include <gen_cpp/olap_file.pb.h>
+
+#include <memory>
+
 #include "common/logging.h"
 #include "google/protobuf/util/message_differencer.h"
 #include "io/fs/local_file_system.h"
@@ -28,6 +32,7 @@
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_schema.h"
 #include "olap/tablet_schema_cache.h"
+#include "vec/common/schema_util.h"
 
 namespace doris {
 
diff --git a/be/src/olap/rowset/rowset_writer_context.h 
b/be/src/olap/rowset/rowset_writer_context.h
index 54be9f95970..ad82f6c491e 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -63,7 +63,8 @@ struct RowsetWriterContext {
     io::FileSystemSPtr fs;
     std::string rowset_dir;
     TabletSchemaSPtr tablet_schema;
-    TabletSchemaSPtr original_tablet_schema;
+    // for variant schema update
+    TabletSchemaSPtr merged_tablet_schema;
     // PREPARED/COMMITTED for pending rowset
     // VISIBLE for non-pending rowset
     RowsetStatePB rowset_state;
diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index b968f684855..e42a80170a8 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -43,6 +43,8 @@
 #include "vec/common/schema_util.h" // variant column
 #include "vec/core/block.h"
 #include "vec/core/columns_with_type_and_name.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
 
 namespace doris {
 using namespace ErrorCode;
@@ -61,60 +63,38 @@ Status SegmentFlusher::flush_single_block(const 
vectorized::Block* block, int32_
     if (block->rows() == 0) {
         return Status::OK();
     }
-    // Expand variant columns
     vectorized::Block flush_block(*block);
-    TabletSchemaSPtr flush_schema;
     if (_context->write_type != DataWriteType::TYPE_COMPACTION &&
         _context->tablet_schema->num_variant_columns() > 0) {
-        RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, 
flush_schema));
+        RETURN_IF_ERROR(_parse_variant_columns(flush_block));
     }
     bool no_compression = flush_block.bytes() <= 
config::segment_compression_threshold_kb * 1024;
     if (config::enable_vertical_segment_writer &&
         _context->tablet_schema->cluster_key_idxes().empty()) {
         std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
-        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression, flush_schema));
+        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression));
         RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, 
flush_block.rows()));
-        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, 
flush_size));
+        RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), 
flush_size));
     } else {
         std::unique_ptr<segment_v2::SegmentWriter> writer;
-        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression, flush_schema));
+        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression));
         RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, 
flush_block.rows()));
-        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, 
flush_size));
+        RETURN_IF_ERROR(_flush_segment_writer(writer, nullptr, flush_size));
     }
     return Status::OK();
 }
 
-Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
-                                                     TabletSchemaSPtr& 
flush_schema) {
+Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
     size_t num_rows = block.rows();
     if (num_rows == 0) {
         return Status::OK();
     }
 
-    {
-        std::lock_guard<std::mutex> lock(*(_context->schema_lock));
-        // save original tablet schema, _context->tablet_schema maybe modified
-        if (_context->original_tablet_schema == nullptr) {
-            _context->original_tablet_schema = _context->tablet_schema;
-        }
-    }
-
     std::vector<int> variant_column_pos;
-    if (_context->partial_update_info && 
_context->partial_update_info->is_partial_update) {
-        // check columns that used to do partial updates should not include 
variant
-        for (int i : _context->partial_update_info->update_cids) {
-            const auto& col = *_context->original_tablet_schema->columns()[i];
-            if (!col.is_key() && col.name() != DELETE_SIGN) {
-                return Status::InvalidArgument(
-                        "Not implement partial update for variant only support 
delete currently");
-            }
-        }
-    } else {
-        // find positions of variant columns
-        for (int i = 0; i < 
_context->original_tablet_schema->columns().size(); ++i) {
-            if 
(_context->original_tablet_schema->columns()[i]->is_variant_type()) {
-                variant_column_pos.push_back(i);
-            }
+    for (int i = 0; i < block.columns(); ++i) {
+        const auto& entry = block.get_by_position(i);
+        if (vectorized::is_variant_type(remove_nullable(entry.type))) {
+            variant_column_pos.push_back(i);
         }
     }
 
@@ -123,37 +103,8 @@ Status 
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
     }
 
     vectorized::schema_util::ParseContext ctx;
-    ctx.record_raw_json_column = 
_context->original_tablet_schema->store_row_column();
-    RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns(
-            block, variant_column_pos, ctx));
-
-    flush_schema = std::make_shared<TabletSchema>();
-    flush_schema->copy_from(*_context->original_tablet_schema);
-    vectorized::Block flush_block(std::move(block));
-    vectorized::schema_util::rebuild_schema_and_block(
-            _context->original_tablet_schema, variant_column_pos, flush_block, 
flush_schema);
-
-    {
-        // Update rowset schema, tablet's tablet schema will be updated when 
build Rowset
-        // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
-        // ctx.tablet_schema:  A(bigint), B(double)
-        // => update_schema:   A(bigint), B(double), C(int), D(int)
-        std::lock_guard<std::mutex> lock(*(_context->schema_lock));
-        TabletSchemaSPtr update_schema;
-        RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
-                {_context->tablet_schema, flush_schema}, nullptr, 
update_schema));
-        CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
-                << "Rowset merge schema columns count is " << 
update_schema->num_columns()
-                << ", but flush_schema is larger " << 
flush_schema->num_columns()
-                << " update_schema: " << update_schema->dump_structure()
-                << " flush_schema: " << flush_schema->dump_structure();
-        _context->tablet_schema.swap(update_schema);
-        VLOG_DEBUG << "dump rs schema: " << 
_context->tablet_schema->dump_structure();
-    }
-
-    block.swap(flush_block); // NOLINT(bugprone-use-after-move)
-    VLOG_DEBUG << "dump block: " << block.dump_data();
-    VLOG_DEBUG << "dump flush schema: " << flush_schema->dump_structure();
+    ctx.record_raw_json_column = _context->tablet_schema->store_row_column();
+    RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos, ctx));
     return Status::OK();
 }
 
@@ -194,8 +145,7 @@ Status 
SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWrit
 }
 
 Status 
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
 writer,
-                                              int32_t segment_id, bool 
no_compression,
-                                              TabletSchemaSPtr flush_schema) {
+                                              int32_t segment_id, bool 
no_compression) {
     io::FileWriterPtr file_writer;
     RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id, 
file_writer));
 
@@ -207,10 +157,10 @@ Status 
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
         writer_options.compression_type = NO_COMPRESSION;
     }
 
-    const auto& tablet_schema = flush_schema ? flush_schema : 
_context->tablet_schema;
-    writer.reset(new segment_v2::SegmentWriter(
-            file_writer.get(), segment_id, tablet_schema, _context->tablet, 
_context->data_dir,
-            _context->max_rows_per_segment, writer_options, 
_context->mow_context));
+    writer.reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id,
+                                               _context->tablet_schema, 
_context->tablet,
+                                               _context->data_dir, 
_context->max_rows_per_segment,
+                                               writer_options, 
_context->mow_context));
     {
         std::lock_guard<SpinLock> l(_lock);
         _file_writers.emplace(segment_id, std::move(file_writer));
@@ -226,7 +176,7 @@ Status 
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
 
 Status SegmentFlusher::_create_segment_writer(
         std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t 
segment_id,
-        bool no_compression, TabletSchemaSPtr flush_schema) {
+        bool no_compression) {
     io::FileWriterPtr file_writer;
     RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id, 
file_writer));
 
@@ -238,10 +188,10 @@ Status SegmentFlusher::_create_segment_writer(
         writer_options.compression_type = NO_COMPRESSION;
     }
 
-    const auto& tablet_schema = flush_schema ? flush_schema : 
_context->tablet_schema;
     writer.reset(new segment_v2::VerticalSegmentWriter(
-            file_writer.get(), segment_id, tablet_schema, _context->tablet, 
_context->data_dir,
-            _context->max_rows_per_segment, writer_options, 
_context->mow_context));
+            file_writer.get(), segment_id, _context->tablet_schema, 
_context->tablet,
+            _context->data_dir, _context->max_rows_per_segment, writer_options,
+            _context->mow_context));
     {
         std::lock_guard<SpinLock> l(_lock);
         _file_writers.emplace(segment_id, std::move(file_writer));
diff --git a/be/src/olap/rowset/segment_creator.h 
b/be/src/olap/rowset/segment_creator.h
index 214322ed8d5..93508e9629d 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -138,17 +138,15 @@ public:
     bool need_buffering();
 
 private:
-    Status _expand_variant_to_subcolumns(vectorized::Block& block, 
TabletSchemaSPtr& flush_schema);
+    Status _parse_variant_columns(vectorized::Block& block);
     Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& 
segment_writer,
                      const vectorized::Block* block, size_t row_offset, size_t 
row_num);
     Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& 
segment_writer,
                      const vectorized::Block* block, size_t row_offset, size_t 
row_num);
     Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& 
writer,
-                                  int32_t segment_id, bool no_compression = 
false,
-                                  TabletSchemaSPtr flush_schema = nullptr);
+                                  int32_t segment_id, bool no_compression = 
false);
     Status 
_create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& 
writer,
-                                  int32_t segment_id, bool no_compression = 
false,
-                                  TabletSchemaSPtr flush_schema = nullptr);
+                                  int32_t segment_id, bool no_compression = 
false);
     Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& 
writer,
                                  TabletSchemaSPtr flush_schema = nullptr,
                                  int64_t* flush_size = nullptr);
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index 66ad0eb92a9..dcc082c22ae 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -34,10 +34,9 @@ namespace segment_v2 {
 Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
                                       vectorized::PathInData path,
                                       const SubcolumnColumnReaders::Node* node,
-                                      const SubcolumnColumnReaders::Node* root,
-                                      bool output_as_raw_json) {
+                                      const SubcolumnColumnReaders::Node* 
root) {
     // None leave node need merge with root
-    auto* stream_iter = new HierarchicalDataReader(path, output_as_raw_json);
+    auto* stream_iter = new HierarchicalDataReader(path);
     std::vector<const SubcolumnColumnReaders::Node*> leaves;
     vectorized::PathsInData leaves_paths;
     SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths);
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index 67f78651416..1d02685e445 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -64,12 +64,11 @@ using SubcolumnColumnReaders = 
vectorized::SubcolumnsTree<SubcolumnReader>;
 // Reader for hierarchical data for variant, merge with root(sparse encoded 
columns)
 class HierarchicalDataReader : public ColumnIterator {
 public:
-    HierarchicalDataReader(const vectorized::PathInData& path, bool 
output_as_raw_json = false)
-            : _path(path), _output_as_raw_json(output_as_raw_json) {}
+    HierarchicalDataReader(const vectorized::PathInData& path) : _path(path) {}
 
     static Status create(std::unique_ptr<ColumnIterator>* reader, 
vectorized::PathInData path,
                          const SubcolumnColumnReaders::Node* target_node,
-                         const SubcolumnColumnReaders::Node* root, bool 
output_as_raw_json = false);
+                         const SubcolumnColumnReaders::Node* root);
 
     Status init(const ColumnIteratorOptions& opts) override;
 
@@ -93,7 +92,6 @@ private:
     std::unique_ptr<StreamReader> _root_reader;
     size_t _rows_read = 0;
     vectorized::PathInData _path;
-    bool _output_as_raw_json = false;
 
     template <typename NodeFunction>
     Status tranverse(NodeFunction&& node_func) {
@@ -154,26 +152,9 @@ private:
             return Status::OK();
         }));
 
-        if (_output_as_raw_json) {
-            auto col_to = vectorized::ColumnString::create();
-            col_to->reserve(nrows * 2);
-            vectorized::VectorBufferWriter write_buffer(*col_to.get());
-            auto type = std::make_shared<vectorized::DataTypeObject>();
-            for (size_t i = 0; i < nrows; ++i) {
-                type->to_string(container_variant, i, write_buffer);
-                write_buffer.commit();
-            }
-            if (variant.empty()) {
-                
variant.create_root(std::make_shared<vectorized::DataTypeString>(),
-                                    std::move(col_to));
-            } else {
-                variant.get_root()->insert_range_from(*col_to, 0, 
col_to->size());
-            }
-        } else {
-            // TODO select v:b -> v.b / v.b.c but v.d maybe in v
-            // copy container variant to dst variant, todo avoid copy
-            variant.insert_range_from(container_variant, 0, nrows);
-        }
+        // TODO select v:b -> v.b / v.b.c but v.d maybe in v
+        // copy container variant to dst variant, todo avoid copy
+        variant.insert_range_from(container_variant, 0, nrows);
 
         // variant.set_num_rows(nrows);
         _rows_read += nrows;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index f2ec504c90f..f429cb4afb9 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -536,24 +536,19 @@ Status Segment::new_column_iterator_with_path(const 
TabletColumn& tablet_column,
     auto sparse_node = tablet_column.has_path_info()
                                ? 
_sparse_column_tree.find_exact(*tablet_column.path_info_ptr())
                                : nullptr;
-    if (opt != nullptr && opt->io_ctx.reader_type == 
ReaderType::READER_ALTER_TABLE) {
-        CHECK(tablet_column.is_variant_type());
-        if (root == nullptr) {
-            // No such variant column in this segment, get a default one
-            RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
-            return Status::OK();
-        }
-        bool output_as_raw_json = true;
-        // Alter table operation should read the whole variant column, since 
it does not aware of
-        // subcolumns of variant during processing rewriting rowsets.
-        // This is slow, since it needs to read all sub columns and merge them 
into a single column
-        RETURN_IF_ERROR(
-                HierarchicalDataReader::create(iter, root_path, root, root, 
output_as_raw_json));
-        return Status::OK();
-    }
 
-    if (opt == nullptr || opt->io_ctx.reader_type != ReaderType::READER_QUERY) 
{
-        // Could be compaction ..etc and read flat leaves nodes data
+    // Currently only compaction and checksum need to read flat leaves
+    // They both use tablet_schema_with_merged_max_schema_version as read 
schema
+    auto type_to_read_flat_leaves = [](ReaderType type) {
+        return type == ReaderType::READER_BASE_COMPACTION ||
+               type == ReaderType::READER_CUMULATIVE_COMPACTION ||
+               type == ReaderType::READER_COLD_DATA_COMPACTION ||
+               type == ReaderType::READER_SEGMENT_COMPACTION ||
+               type == ReaderType::READER_FULL_COMPACTION || type == 
ReaderType::READER_CHECKSUM;
+    };
+
+    if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
+        // compaction need to read flat leaves nodes data to prevent from 
amplification
         const auto* node = tablet_column.has_path_info()
                                    ? 
_sub_column_tree.find_leaf(*tablet_column.path_info_ptr())
                                    : nullptr;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5eadac2abde..143e1b36329 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -140,7 +140,8 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* 
meta, uint32_t colum
     }
 }
 
-Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const 
TabletColumn& column) {
+Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const 
TabletColumn& column,
+                                                    const TabletSchemaSPtr& 
tablet_schema) {
     ColumnWriterOptions opts;
     opts.meta = _footer.add_columns();
 
@@ -148,9 +149,9 @@ Status 
VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
 
     // now we create zone map for key columns in AGG_KEYS or all column in 
UNIQUE_KEYS or DUP_KEYS
     // except for columns whose type don't support zone map.
-    opts.need_zone_map = column.is_key() || _tablet_schema->keys_type() != 
KeysType::AGG_KEYS;
+    opts.need_zone_map = column.is_key() || tablet_schema->keys_type() != 
KeysType::AGG_KEYS;
     opts.need_bloom_filter = column.is_bf_column();
-    auto* tablet_index = 
_tablet_schema->get_ngram_bf_index(column.unique_id());
+    auto* tablet_index = tablet_schema->get_ngram_bf_index(column.unique_id());
     if (tablet_index) {
         opts.need_bloom_filter = true;
         opts.is_ngram_bf_index = true;
@@ -166,12 +167,14 @@ Status 
VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
     }
     // skip write inverted index on load if skip_write_index_on_load is true
     if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
-        _tablet_schema->skip_write_index_on_load()) {
+        tablet_schema->skip_write_index_on_load()) {
         skip_inverted_index = true;
     }
     // indexes for this column
-    opts.indexes = _tablet_schema->get_indexes_for_column(column);
+    opts.indexes = tablet_schema->get_indexes_for_column(column);
     if (!InvertedIndexColumnWriter::check_support_inverted_index(column)) {
+        // skip inverted index if invalid
+        opts.indexes.clear();
         opts.need_zone_map = false;
         opts.need_bloom_filter = false;
         opts.need_bitmap_index = false;
@@ -302,7 +305,8 @@ void 
VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl
 //       2.2 build read plan to read by batch
 //       2.3 fill block
 // 3. set columns to data convertor and then write all columns
-Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& 
data) {
+Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& 
data,
+                                                                 
vectorized::Block& full_block) {
     if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
         // TODO(plat1ko): CloudStorageEngine
         return Status::NotSupported("append_block_with_partial_content");
@@ -313,7 +317,7 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
 
     auto tablet = static_cast<Tablet*>(_tablet.get());
     // create full block and fill with input columns
-    auto full_block = _tablet_schema->create_block();
+    full_block = _tablet_schema->create_block();
     const auto& including_cids = 
_opts.rowset_ctx->partial_update_info->update_cids;
     size_t input_id = 0;
     for (auto i : including_cids) {
@@ -702,16 +706,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();
+            const vectorized::DataTypePtr& final_data_type_from_object =
+                    entry->data.get_least_common_type();
+            vectorized::PathInDataBuilder full_path_builder;
+            auto full_path = 
full_path_builder.append(parent_column->name_lower_case(), false)
+                                     .append(entry->path.get_parts(), false)
+                                     .build();
+            return vectorized::schema_util::get_column_by_type(
+                    final_data_type_from_object, column_name,
+                    vectorized::schema_util::ExtraInfo {
+                            .unique_id = -1,
+                            .parent_unique_id = parent_column->unique_id(),
+                            .path_info = full_path});
+        };
+
+        CHECK(object_column.is_finalized());
+        // common extracted columns
+        for (const auto& entry :
+             
vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) 
{
+            if (entry->path.empty()) {
+                // already handled by parent column
+                continue;
+            }
+            CHECK(entry->data.is_finalized());
+            int current_column_id = column_id++;
+            TabletColumn tablet_column = generate_column_info(entry);
+            vectorized::schema_util::inherit_column_attributes(*parent_column, 
tablet_column,
+                                                               _flush_schema);
+            RETURN_IF_ERROR(_create_column_writer(current_column_id 
/*unused*/, tablet_column,
+                                                  _flush_schema));
+            
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                    {entry->data.get_finalized_column_ptr()->get_ptr(),
+                     entry->data.get_least_common_type(), 
tablet_column.name()},
+                    data.row_pos, data.num_rows, current_column_id));
+            // convert column data from engine format to storage layer format
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(current_column_id);
+            if (!status.ok()) {
+                return status;
+            }
+            RETURN_IF_ERROR(_column_writers[current_column_id]->append(
+                    column->get_nullmap(), column->get_data(), data.num_rows));
+            _flush_schema->append_column(tablet_column);
+            _olap_data_convertor->clear_source_content();
+        }
+        // sparse_columns
+        for (const auto& entry : 
vectorized::schema_util::get_sorted_subcolumns(
+                     object_column.get_sparse_subcolumns())) {
+            TabletColumn sparse_tablet_column = generate_column_info(entry);
+            _flush_schema->mutable_column_by_uid(parent_column->unique_id())
+                    .append_sparse_column(sparse_tablet_column);
+
+            // add sparse column to footer
+            auto* column_pb = _footer.mutable_columns(i);
+            _init_column_meta(column_pb->add_sparse_columns(), -1, 
sparse_tablet_column);
+        }
+    }
+
+    // Update rowset schema, tablet's tablet schema will be updated when build 
Rowset
+    // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
+    // ctx.tablet_schema:  A(bigint), B(double)
+    // => update_schema:   A(bigint), B(double), C(int), D(int)
+    std::lock_guard<std::mutex> lock(*(_opts.rowset_ctx->schema_lock));
+    if (_opts.rowset_ctx->merged_tablet_schema == nullptr) {
+        _opts.rowset_ctx->merged_tablet_schema = 
_opts.rowset_ctx->tablet_schema;
+    }
+    TabletSchemaSPtr update_schema;
+    RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+            {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, 
update_schema));
+    CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns())
+            << "Rowset merge schema columns count is " << 
update_schema->num_columns()
+            << ", but flush_schema is larger " << _flush_schema->num_columns()
+            << " update_schema: " << update_schema->dump_structure()
+            << " flush_schema: " << _flush_schema->dump_structure();
+    _opts.rowset_ctx->merged_tablet_schema.swap(update_schema);
+    VLOG_DEBUG << "dump block " << data.block->dump_data();
+    VLOG_DEBUG << "dump rs schema: " << 
_opts.rowset_ctx->merged_tablet_schema->dump_full_schema();
+    VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " 
<< _segment_id;
+    return Status::OK();
+}
+
 Status VerticalSegmentWriter::write_batch() {
     if (_opts.rowset_ctx->partial_update_info &&
         _opts.rowset_ctx->partial_update_info->is_partial_update &&
         _opts.write_type == DataWriteType::TYPE_DIRECT &&
         !_opts.rowset_ctx->is_transient_rowset_writer) {
         for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
-            RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid)));
+            RETURN_IF_ERROR(
+                    _create_column_writer(cid, _tablet_schema->column(cid), 
_tablet_schema));
         }
+        vectorized::Block full_block;
         for (auto& data : _batched_blocks) {
-            RETURN_IF_ERROR(_append_block_with_partial_content(data));
+            RETURN_IF_ERROR(_append_block_with_partial_content(data, 
full_block));
+        }
+        for (auto& data : _batched_blocks) {
+            RowsInBlock full_rows_block {&full_block, data.row_pos, 
data.num_rows};
+            
RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block));
         }
         for (auto& column_writer : _column_writers) {
             RETURN_IF_ERROR(column_writer->finish());
@@ -733,7 +848,7 @@ Status VerticalSegmentWriter::write_batch() {
     std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
     vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
     for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
-        RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid)));
+        RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid), _tablet_schema));
         for (auto& data : _batched_blocks) {
             
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
                     data.block, data.row_pos, data.num_rows, 
std::vector<uint32_t> {cid}));
@@ -806,6 +921,19 @@ Status VerticalSegmentWriter::write_batch() {
         _num_rows_written += data.num_rows;
     }
 
+    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
+        size_t original_writers_cnt = _column_writers.size();
+        // handle variant dynamic sub columns
+        for (auto& data : _batched_blocks) {
+            RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data));
+        }
+        for (size_t i = original_writers_cnt; i < _column_writers.size(); ++i) 
{
+            RETURN_IF_ERROR(_column_writers[i]->finish());
+            RETURN_IF_ERROR(_column_writers[i]->write_data());
+        }
+    }
+
     _batched_blocks.clear();
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index ffa5f3807ae..8fd854c3e95 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -117,11 +117,14 @@ public:
     Slice min_encoded_key();
     Slice max_encoded_key();
 
+    TabletSchemaSPtr flush_schema() const { return _flush_schema; };
+
     void clear();
 
 private:
     void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const 
TabletColumn& column);
-    Status _create_column_writer(uint32_t cid, const TabletColumn& column);
+    Status _create_column_writer(uint32_t cid, const TabletColumn& column,
+                                 const TabletSchemaSPtr& schema);
     size_t _calculate_inverted_index_file_size();
     uint64_t _estimated_remaining_size();
     Status _write_ordinal_index();
@@ -146,7 +149,8 @@ private:
     void _set_min_key(const Slice& key);
     void _set_max_key(const Slice& key);
     void _serialize_block_to_row_column(vectorized::Block& block);
-    Status _append_block_with_partial_content(RowsInBlock& data);
+    Status _append_block_with_partial_content(RowsInBlock& data, 
vectorized::Block& full_block);
+    Status _append_block_with_variant_subcolumns(RowsInBlock& data);
     Status _fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
                                  const std::vector<bool>& 
use_default_or_null_flag,
                                  bool has_default_or_nullable, const size_t& 
segment_start_pos,
@@ -204,6 +208,9 @@ private:
     std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
 
     std::vector<RowsInBlock> _batched_blocks;
+
+    // contains auto generated columns, should be nullptr if no variants's 
subcolumns
+    TabletSchemaSPtr _flush_schema = nullptr;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 23232c4d0a5..32a6ba88ce7 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -305,12 +305,22 @@ Status RowsetBuilder::commit_txn() {
     }
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_commit_txn_timer);
-    if (tablet()->tablet_schema()->num_variant_columns() > 0) {
+
+    const RowsetWriterContext& rw_ctx = _rowset_writer->context();
+    if (rw_ctx.tablet_schema->num_variant_columns() > 0) {
+        // Need to merge schema with `rw_ctx.merged_tablet_schema` in prior,
+        // merged schema keeps the newest merged schema for the rowset, which 
is updated and merged
+        // during flushing segments.
+        if (rw_ctx.merged_tablet_schema != nullptr) {
+            
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema));
+        }
+        // We should merge rowset schema further, in case that the 
merged_tablet_schema maybe null
+        // when enable_memtable_on_sink_node is true, the merged_tablet_schema 
will not be passed to
+        // the destination backend.
         // update tablet schema when meet variant columns, before commit_txn
         // Eg. rowset schema:       A(int),    B(float),  C(int), D(int)
         // _tabelt->tablet_schema:  A(bigint), B(double)
         //  => update_schema:       A(bigint), B(double), C(int), D(int)
-        const RowsetWriterContext& rw_ctx = _rowset_writer->context();
         
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
     }
     // Transfer ownership of `PendingRowsetGuard` to `TxnManager`
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index a4ed6a527bf..a0483ad5d8e 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -927,7 +927,8 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
         // This is because the schema change for a variant needs to ignore the 
extracted columns.
         // Otherwise, the schema types in different rowsets might be 
inconsistent. When performing a schema change,
         // the complete variant is constructed by reading all the sub-columns 
of the variant.
-        sc_params.new_tablet_schema = 
new_tablet->tablet_schema()->copy_without_extracted_columns();
+        sc_params.new_tablet_schema =
+                
new_tablet->tablet_schema()->copy_without_variant_extracted_columns();
         sc_params.ref_rowset_readers.reserve(rs_splits.size());
         for (RowSetSplits& split : rs_splits) {
             sc_params.ref_rowset_readers.emplace_back(split.rs_reader);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b48262250ed..0a791614b87 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2120,7 +2120,11 @@ Status Tablet::create_transient_rowset_writer(
     context.rowset_state = PREPARED;
     context.segments_overlap = OVERLAPPING;
     context.tablet_schema = std::make_shared<TabletSchema>();
-    context.tablet_schema->copy_from(*(rowset_ptr->tablet_schema()));
+    // During a partial update, the extracted columns of a variant should not 
be included in the tablet schema.
+    // This is because the partial update for a variant needs to ignore the 
extracted columns.
+    // Otherwise, the schema types in different rowsets might be inconsistent. 
When performing a partial update,
+    // the complete variant is constructed by reading all the sub-columns of 
the variant.
+    context.tablet_schema = 
rowset_ptr->tablet_schema()->copy_without_variant_extracted_columns();
     context.newest_write_timestamp = UnixSeconds();
     context.tablet_id = table_id();
     context.enable_segcompaction = false;
@@ -2945,6 +2949,13 @@ Status 
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                 (std::find(including_cids.cbegin(), including_cids.cend(),
                            rowset_schema->sequence_col_idx()) != 
including_cids.cend());
     }
+    if (rowset_schema->num_variant_columns() > 0) {
+        // During partial updates, the extracted columns of a variant should 
not be included in the rowset schema.
+        // This is because the partial update for a variant needs to ignore 
the extracted columns.
+        // Otherwise, the schema types in different rowsets might be 
inconsistent. When performing a partial update,
+        // the complete variant is constructed by reading all the sub-columns 
of the variant.
+        rowset_schema = 
rowset_schema->copy_without_variant_extracted_columns();
+    }
     // use for partial update
     PartialUpdateReadPlan read_plan_ori;
     PartialUpdateReadPlan read_plan_update;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 0900c6d8d40..26d9d913f2f 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -563,6 +563,10 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
         _column_path->from_protobuf(column.column_path_info());
         _parent_col_unique_id = 
column.column_path_info().parrent_column_unique_id();
     }
+    if (is_variant_type() && !column.has_column_path_info()) {
+        // set path info for variant root column, to prevent from missing
+        _column_path = 
std::make_shared<vectorized::PathInData>(_col_name_lower_case);
+    }
     for (auto& column_pb : column.sparse_columns()) {
         TabletColumn column;
         column.init_from_pb(column_pb);
@@ -854,7 +858,8 @@ void TabletSchema::append_column(TabletColumn column, 
ColumnType col_type) {
     _cols.push_back(std::make_shared<TabletColumn>(std::move(column)));
     // The dropped column may have same name with exsiting column, so that
     // not add to name to index map, only for uid to index map
-    if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type()) {
+    if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type() ||
+        _cols.back()->is_extracted_column()) {
         _field_name_to_index.emplace(StringRef(_cols.back()->name()), 
_num_columns);
         _field_path_to_index[_cols.back()->path_info_ptr().get()] = 
_num_columns;
     } else if (col_type == ColumnType::NORMAL) {
@@ -1112,7 +1117,7 @@ void TabletSchema::merge_dropped_columns(const 
TabletSchema& src_schema) {
     }
 }
 
-TabletSchemaSPtr TabletSchema::copy_without_extracted_columns() {
+TabletSchemaSPtr TabletSchema::copy_without_variant_extracted_columns() {
     TabletSchemaSPtr copy = std::make_shared<TabletSchema>();
     TabletSchemaPB tablet_schema_pb;
     this->to_schema_pb(&tablet_schema_pb);
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index b8f26a1f601..bd3b1f6ca4e 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -454,7 +454,7 @@ public:
 
     vectorized::Block create_block_by_cids(const std::vector<uint32_t>& cids);
 
-    std::shared_ptr<TabletSchema> copy_without_extracted_columns();
+    std::shared_ptr<TabletSchema> copy_without_variant_extracted_columns();
     InvertedIndexStorageFormatPB get_inverted_index_storage_format() const {
         return _inverted_index_storage_format;
     }
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index 3bae978f4d3..1f59e000d32 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -749,8 +749,15 @@ void ColumnObject::insert_from(const IColumn& src, size_t 
n) {
 void ColumnObject::try_insert(const Field& field) {
     if (field.get_type() != Field::Types::VariantMap) {
         auto* root = get_subcolumn({});
-        if (!root) {
-            doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, "Failed to 
find root column_path");
+        // Insert to an emtpy ColumnObject may result root null,
+        // so create a root column of Variant is expected.
+        if (root == nullptr) {
+            bool succ = add_sub_column({}, num_rows);
+            if (!succ) {
+                throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+                                       "Failed to add root sub column {}");
+            }
+            root = get_subcolumn({});
         }
         root->insert(field);
         ++num_rows;
@@ -1290,9 +1297,11 @@ Status ColumnObject::merge_sparse_to_root_column() {
                                        
parser.getWriter().getOutput()->getSize());
         result_column_nullable->get_null_map_data().push_back(0);
     }
-
-    // assign merged column
-    subcolumns.get_mutable_root()->data.get_finalized_column_ptr() = 
mresult->get_ptr();
+    subcolumns.get_mutable_root()->data.get_finalized_column().clear();
+    // assign merged column, do insert_range_from to make a copy, instead of 
replace the ptr itselft
+    // to make sure the root column ptr is not changed
+    
subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from(
+            *mresult->get_ptr(), 0, num_rows);
     return Status::OK();
 }
 
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 55abd534dd1..53516877b6d 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -230,10 +230,6 @@ private:
     // this structure and fill with Subcolumns sub items
     mutable std::shared_ptr<rapidjson::Document> doc_structure;
 
-    // column with raw json strings
-    // used for quickly row store encoding
-    ColumnPtr rowstore_column;
-
     using SubColumnWithName = std::pair<PathInData, const Subcolumn*>;
     // Cached search results for previous row (keyed as index in JSON object) 
- used as a hint.
     mutable std::vector<SubColumnWithName> _prev_positions;
@@ -259,10 +255,6 @@ public:
         return 
subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable();
     }
 
-    void set_rowstore_column(ColumnPtr col) { rowstore_column = col; }
-
-    ColumnPtr get_rowstore_column() const { return rowstore_column; }
-
     Status serialize_one_row_to_string(int row, std::string* output) const;
 
     Status serialize_one_row_to_string(int row, BufferWritable& output) const;
diff --git a/be/src/vec/common/schema_util.cpp 
b/be/src/vec/common/schema_util.cpp
index e8fd23f7569..016336d4098 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -374,45 +374,44 @@ void update_least_sparse_column(const 
std::vector<TabletSchemaSPtr>& schemas,
     update_least_schema_internal(subcolumns_types, common_schema, true, 
variant_col_unique_id);
 }
 
-void inherit_root_attributes(TabletSchemaSPtr& schema) {
-    std::unordered_map<int32_t, TabletIndex> variants_index_meta;
-    // Get all variants tablet index metas if exist
-    for (const auto& col : schema->columns()) {
-        auto index_meta = schema->get_inverted_index(col->unique_id(), "");
-        if (col->is_variant_type() && index_meta != nullptr) {
-            variants_index_meta.emplace(col->unique_id(), *index_meta);
+void inherit_column_attributes(const TabletColumn& source, TabletColumn& 
target,
+                               TabletSchemaSPtr& target_schema) {
+    if (target.type() != FieldType::OLAP_FIELD_TYPE_TINYINT &&
+        target.type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
+        target.type() != FieldType::OLAP_FIELD_TYPE_DOUBLE &&
+        target.type() != FieldType::OLAP_FIELD_TYPE_FLOAT) {
+        // above types are not supported in bf
+        target.set_is_bf_column(source.is_bf_column());
+    }
+    target.set_aggregation_method(source.aggregation());
+    const auto* source_index_meta = 
target_schema->get_inverted_index(source.unique_id(), "");
+    if (source_index_meta != nullptr) {
+        // add index meta
+        TabletIndex index_info = *source_index_meta;
+        
index_info.set_escaped_escaped_index_suffix_path(target.path_info_ptr()->get_path());
+        // get_inverted_index: No need to check, just inherit directly
+        const auto* target_index_meta = 
target_schema->get_inverted_index(target, false);
+        if (target_index_meta != nullptr) {
+            // already exist
+            target_schema->update_index(target, index_info);
+        } else {
+            target_schema->append_index(index_info);
         }
     }
+}
 
+void inherit_column_attributes(TabletSchemaSPtr& schema) {
     // Add index meta if extracted column is missing index meta
     for (size_t i = 0; i < schema->num_columns(); ++i) {
         TabletColumn& col = schema->mutable_column(i);
         if (!col.is_extracted_column()) {
             continue;
         }
-        if (col.type() != FieldType::OLAP_FIELD_TYPE_TINYINT &&
-            col.type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
-            col.type() != FieldType::OLAP_FIELD_TYPE_DOUBLE &&
-            col.type() != FieldType::OLAP_FIELD_TYPE_FLOAT) {
-            // above types are not supported in bf
-            
col.set_is_bf_column(schema->column_by_uid(col.parent_unique_id()).is_bf_column());
-        }
-        
col.set_aggregation_method(schema->column_by_uid(col.parent_unique_id()).aggregation());
-        auto it = variants_index_meta.find(col.parent_unique_id());
-        // variant has no index meta, ignore
-        if (it == variants_index_meta.end()) {
+        if (schema->field_index(col.parent_unique_id()) == -1) {
+            // parent column is missing, maybe dropped
             continue;
         }
-        auto index_meta = schema->get_inverted_index(col, false);
-        // add index meta
-        TabletIndex index_info = it->second;
-        
index_info.set_escaped_escaped_index_suffix_path(col.path_info_ptr()->get_path());
-        if (index_meta != nullptr) {
-            // already exist
-            schema->update_index(col, index_info);
-        } else {
-            schema->append_index(index_info);
-        }
+        
inherit_column_attributes(schema->column_by_uid(col.parent_unique_id()), col, 
schema);
     }
 }
 
@@ -473,7 +472,7 @@ Status get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
         update_least_sparse_column(schemas, output_schema, unique_id, 
path_set);
     }
 
-    inherit_root_attributes(output_schema);
+    inherit_column_attributes(output_schema);
     if (check_schema_size &&
         output_schema->columns().size() > 
config::variant_max_merged_tablet_schema_size) {
         return Status::DataQualityError("Reached max column size limit {}",
@@ -483,25 +482,8 @@ Status get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
     return Status::OK();
 }
 
-Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& 
variant_pos,
-                                        const ParseContext& ctx) {
-    try {
-        // Parse each variant column from raw string column
-        RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, 
variant_pos, ctx));
-        vectorized::schema_util::finalize_variant_columns(block, variant_pos,
-                                                          false /*not ingore 
sparse*/);
-        RETURN_IF_ERROR(
-                
vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_pos));
-    } catch (const doris::Exception& e) {
-        // TODO more graceful, max_filter_ratio
-        LOG(WARNING) << "encounter execption " << e.to_string();
-        return Status::InternalError(e.to_string());
-    }
-    return Status::OK();
-}
-
-Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
-                             const ParseContext& ctx) {
+Status _parse_variant_columns(Block& block, const std::vector<int>& 
variant_pos,
+                              const ParseContext& ctx) {
     for (int i = 0; i < variant_pos.size(); ++i) {
         auto column_ref = block.get_by_position(variant_pos[i]).column;
         bool is_nullable = column_ref->is_nullable();
@@ -510,36 +492,8 @@ Status parse_variant_columns(Block& block, const 
std::vector<int>& variant_pos,
         var.assume_mutable_ref().finalize();
 
         MutableColumnPtr variant_column;
-        bool record_raw_string_with_serialization = false;
-        // set
-        auto encode_rowstore = [&]() {
-            if (!ctx.record_raw_json_column) {
-                return Status::OK();
-            }
-            auto* var = 
static_cast<vectorized::ColumnObject*>(variant_column.get());
-            if (record_raw_string_with_serialization) {
-                // encode to raw json column
-                auto raw_column = vectorized::ColumnString::create();
-                for (size_t i = 0; i < var->rows(); ++i) {
-                    std::string raw_str;
-                    RETURN_IF_ERROR(var->serialize_one_row_to_string(i, 
&raw_str));
-                    raw_column->insert_data(raw_str.c_str(), raw_str.size());
-                }
-                var->set_rowstore_column(raw_column->get_ptr());
-            } else {
-                // use original input json column
-                auto original_var_root = 
vectorized::check_and_get_column<vectorized::ColumnObject>(
-                                                 
remove_nullable(column_ref).get())
-                                                 ->get_root();
-                var->set_rowstore_column(original_var_root);
-            }
-            return Status::OK();
-        };
-
         if (!var.is_scalar_variant()) {
             variant_column = var.assume_mutable();
-            record_raw_string_with_serialization = true;
-            RETURN_IF_ERROR(encode_rowstore());
             // already parsed
             continue;
         }
@@ -576,8 +530,19 @@ Status parse_variant_columns(Block& block, const 
std::vector<int>& variant_pos,
             result = ColumnNullable::create(result, null_map);
         }
         block.get_by_position(variant_pos[i]).column = result;
-        RETURN_IF_ERROR(encode_rowstore());
-        // block.get_by_position(variant_pos[i]).type = 
std::make_shared<DataTypeObject>("json", true);
+    }
+    return Status::OK();
+}
+
+Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
+                             const ParseContext& ctx) {
+    try {
+        // Parse each variant column from raw string column
+        RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block, 
variant_pos, ctx));
+    } catch (const doris::Exception& e) {
+        // TODO more graceful, max_filter_ratio
+        LOG(WARNING) << "encounter execption " << e.to_string();
+        return Status::InternalError(e.to_string());
     }
     return Status::OK();
 }
@@ -597,53 +562,16 @@ void finalize_variant_columns(Block& block, const 
std::vector<int>& variant_pos,
     }
 }
 
-Status encode_variant_sparse_subcolumns(Block& block, const std::vector<int>& 
variant_pos) {
-    for (int i = 0; i < variant_pos.size(); ++i) {
-        auto& column_ref = 
block.get_by_position(variant_pos[i]).column->assume_mutable_ref();
-        auto& column =
-                column_ref.is_nullable()
-                        ? assert_cast<ColumnObject&>(
-                                  
assert_cast<ColumnNullable&>(column_ref).get_nested_column())
-                        : assert_cast<ColumnObject&>(column_ref);
-        // Make sure the root node is jsonb storage type
-        auto expected_root_type = 
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
-        column.ensure_root_node_type(expected_root_type);
-        RETURN_IF_ERROR(column.merge_sparse_to_root_column());
-    }
+Status encode_variant_sparse_subcolumns(ColumnObject& column) {
+    // Make sure the root node is jsonb storage type
+    auto expected_root_type = 
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
+    column.ensure_root_node_type(expected_root_type);
+    RETURN_IF_ERROR(column.merge_sparse_to_root_column());
     return Status::OK();
 }
 
-static void _append_column(const TabletColumn& parent_variant,
-                           const ColumnObject::Subcolumns::NodePtr& subcolumn,
-                           TabletSchemaSPtr& to_append, bool is_sparse) {
-    // If column already exist in original tablet schema, then we pick common 
type
-    // and cast column to common type, and modify tablet column to common type,
-    // otherwise it's a new column
-    CHECK(to_append.use_count() == 1);
-    const std::string& column_name =
-            parent_variant.name_lower_case() + "." + 
subcolumn->path.get_path();
-    const vectorized::DataTypePtr& final_data_type_from_object =
-            subcolumn->data.get_least_common_type();
-    vectorized::PathInDataBuilder full_path_builder;
-    auto full_path = 
full_path_builder.append(parent_variant.name_lower_case(), false)
-                             .append(subcolumn->path.get_parts(), false)
-                             .build();
-    TabletColumn tablet_column = vectorized::schema_util::get_column_by_type(
-            final_data_type_from_object, column_name,
-            vectorized::schema_util::ExtraInfo {.unique_id = -1,
-                                                .parent_unique_id = 
parent_variant.unique_id(),
-                                                .path_info = full_path});
-
-    if (!is_sparse) {
-        to_append->append_column(std::move(tablet_column));
-    } else {
-        to_append->mutable_column_by_uid(parent_variant.unique_id())
-                .append_sparse_column(std::move(tablet_column));
-    }
-}
-
 // sort by paths in lexicographical order
-static vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
+vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
         const vectorized::ColumnObject::Subcolumns& subcolumns) {
     // sort by paths in lexicographical order
     vectorized::ColumnObject::Subcolumns sorted = subcolumns;
@@ -653,70 +581,12 @@ static vectorized::ColumnObject::Subcolumns 
get_sorted_subcolumns(
     return sorted;
 }
 
-void rebuild_schema_and_block(const TabletSchemaSPtr& original,
-                              const std::vector<int>& variant_positions, 
Block& flush_block,
-                              TabletSchemaSPtr& flush_schema) {
-    // rebuild schema and block with variant extracted columns
-
-    // 1. Flatten variant column into flat columns, append flatten columns to 
the back of original Block and TabletSchema
-    // those columns are extracted columns, leave none extracted columns 
remain in original variant column, which is
-    // JSONB format at present.
-    // 2. Collect columns that need to be added or modified when data type 
changes or new columns encountered
-    for (size_t variant_pos : variant_positions) {
-        auto column_ref = flush_block.get_by_position(variant_pos).column;
-        bool is_nullable = column_ref->is_nullable();
-        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
-                remove_nullable(column_ref)->assume_mutable_ref());
-        const TabletColumn& parent_column = *original->columns()[variant_pos];
-        CHECK(object_column.is_finalized());
-        std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
-        // common extracted columns
-        for (const auto& entry : 
get_sorted_subcolumns(object_column.get_subcolumns())) {
-            if (entry->path.empty()) {
-                // root
-                root = entry;
-                continue;
-            }
-            _append_column(parent_column, entry, flush_schema, false);
-            const std::string& column_name =
-                    parent_column.name_lower_case() + "." + 
entry->path.get_path();
-            
flush_block.insert({entry->data.get_finalized_column_ptr()->get_ptr(),
-                                entry->data.get_least_common_type(), 
column_name});
-        }
-
-        // add sparse columns to flush_schema
-        for (const auto& entry : 
get_sorted_subcolumns(object_column.get_sparse_subcolumns())) {
-            _append_column(parent_column, entry, flush_schema, true);
-        }
-
-        // Create new variant column and set root column
-        auto obj = vectorized::ColumnObject::create(true, false);
-        // '{}' indicates a root path
-        static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
-                {}, root->data.get_finalized_column_ptr()->assume_mutable(),
-                root->data.get_least_common_type());
-        // // set for rowstore
-        if (original->store_row_column()) {
-            
static_cast<vectorized::ColumnObject*>(obj.get())->set_rowstore_column(
-                    object_column.get_rowstore_column());
-        }
-        vectorized::ColumnPtr result = obj->get_ptr();
-        if (is_nullable) {
-            const auto& null_map = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref)
-                                           .get_null_map_column_ptr();
-            result = vectorized::ColumnNullable::create(result, null_map);
-        }
-        flush_block.get_by_position(variant_pos).column = result;
-        vectorized::PathInDataBuilder full_root_path_builder;
-        auto full_root_path =
-                full_root_path_builder.append(parent_column.name_lower_case(), 
false).build();
-        TabletColumn new_col = flush_schema->column(variant_pos);
-        new_col.set_path_info(full_root_path);
-        flush_schema->replace_column(variant_pos, new_col);
-        VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
-    }
+// ---------------------------
 
-    vectorized::schema_util::inherit_root_attributes(flush_schema);
+std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
+    Block tmp;
+    tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
+    return tmp.dump_data(0, tmp.rows());
 }
 
 // ---------------------------
@@ -747,13 +617,5 @@ Status extract(ColumnPtr source, const PathInData& path, 
MutableColumnPtr& dst)
                   ->assume_mutable();
     return Status::OK();
 }
-// ---------------------------
-
-std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
-    Block tmp;
-    tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
-    return tmp.dump_data(0, tmp.rows());
-}
-// ---------------------------
 
 } // namespace doris::vectorized::schema_util
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 078081593c5..16288541415 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -90,13 +90,11 @@ struct ParseContext {
 // 1. parse variant from raw json string
 // 2. finalize variant column to each subcolumn least commn types, default 
ignore sparse sub columns
 // 3. encode sparse sub columns
-Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& 
variant_pos,
-                                        const ParseContext& ctx);
 Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
                              const ParseContext& ctx);
 void finalize_variant_columns(Block& block, const std::vector<int>& 
variant_pos,
                               bool ignore_sparse = true);
-Status encode_variant_sparse_subcolumns(Block& block, const std::vector<int>& 
variant_pos);
+Status encode_variant_sparse_subcolumns(ColumnObject& column);
 
 // Pick the tablet schema with the highest schema version as the reference.
 // Then update all variant columns to there least common types.
@@ -117,15 +115,14 @@ void update_least_sparse_column(const 
std::vector<TabletSchemaSPtr>& schemas,
                                 const std::unordered_set<PathInData, 
PathInData::Hash>& path_set);
 
 // inherit attributes like index/agg info from it's parent column
-void inherit_root_attributes(TabletSchemaSPtr& schema);
-
-// Rebuild schema from original schema by extend dynamic columns generated 
from ColumnObject.
-// Block consists of two parts, dynamic part of columns and static part of 
columns.
-//     static     extracted
-// | --------- | ----------- |
-// The static ones are original tablet_schame columns
-void rebuild_schema_and_block(const TabletSchemaSPtr& original, const 
std::vector<int>& variant_pos,
-                              Block& flush_block, TabletSchemaSPtr& 
flush_schema);
+void inherit_column_attributes(TabletSchemaSPtr& schema);
+
+void inherit_column_attributes(const TabletColumn& source, TabletColumn& 
target,
+                               TabletSchemaSPtr& target_schema);
+
+// get sorted subcolumns of variant
+vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
+        const vectorized::ColumnObject::Subcolumns& subcolumns);
 
 // Extract json data from source with path
 Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& 
dst);
diff --git a/be/src/vec/data_types/data_type.h 
b/be/src/vec/data_types/data_type.h
index e708cda164e..986f957d72b 100644
--- a/be/src/vec/data_types/data_type.h
+++ b/be/src/vec/data_types/data_type.h
@@ -412,5 +412,9 @@ inline bool is_complex_type(const DataTypePtr& data_type) {
     return which.is_array() || which.is_map() || which.is_struct();
 }
 
+inline bool is_variant_type(const DataTypePtr& data_type) {
+    return WhichDataType(data_type).is_variant_type();
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp 
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index 4d8a3020375..e9015db653a 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -37,10 +37,11 @@ namespace doris {
 
 namespace vectorized {
 
-Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column,
-                                                  MysqlRowBuffer<false>& 
row_buffer, int row_idx,
-                                                  bool col_const,
-                                                  const FormatOptions& 
options) const {
+template <bool is_binary_format>
+Status DataTypeObjectSerDe::_write_column_to_mysql(const IColumn& column,
+                                                   
MysqlRowBuffer<is_binary_format>& row_buffer,
+                                                   int row_idx, bool col_const,
+                                                   const FormatOptions& 
options) const {
     const auto& variant = assert_cast<const ColumnObject&>(column);
     if (!variant.is_finalized()) {
         const_cast<ColumnObject&>(variant).finalize();
@@ -67,6 +68,20 @@ Status DataTypeObjectSerDe::write_column_to_mysql(const 
IColumn& column,
     return Status::OK();
 }
 
+Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<true>& 
row_buffer, int row_idx,
+                                                  bool col_const,
+                                                  const FormatOptions& 
options) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const, 
options);
+}
+
+Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column,
+                                                  MysqlRowBuffer<false>& 
row_buffer, int row_idx,
+                                                  bool col_const,
+                                                  const FormatOptions& 
options) const {
+    return _write_column_to_mysql(column, row_buffer, row_idx, col_const, 
options);
+}
+
 void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, 
JsonbWriter& result,
                                                   Arena* mem_pool, int32_t 
col_id,
                                                   int row_num) const {
@@ -75,12 +90,14 @@ void DataTypeObjectSerDe::write_one_cell_to_jsonb(const 
IColumn& column, JsonbWr
         const_cast<ColumnObject&>(variant).finalize();
     }
     result.writeKey(col_id);
+    std::string value_str;
+    if (!variant.serialize_one_row_to_string(row_num, &value_str)) {
+        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize 
variant {}",
+                               variant.dump_structure());
+    }
     JsonbParser json_parser;
-    CHECK(variant.get_rowstore_column() != nullptr);
-    // use original document
-    const auto& data_ref = variant.get_rowstore_column()->get_data_at(row_num);
     // encode as jsonb
-    bool succ = json_parser.parse(data_ref.data, data_ref.size);
+    bool succ = json_parser.parse(value_str.data(), value_str.size());
     // maybe more graceful, it is ok to check here since data could be parsed
     CHECK(succ);
     result.writeStartBinary();
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h 
b/be/src/vec/data_types/serde/data_type_object_serde.h
index 80554d3dbef..66178f0ecb3 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -82,9 +82,7 @@ public:
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& 
row_buffer,
                                  int row_idx, bool col_const,
-                                 const FormatOptions& options) const override {
-        return Status::NotSupported("write_column_to_mysql with type " + 
column.get_name());
-    }
+                                 const FormatOptions& options) const override;
 
     Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& 
row_buffer,
                                  int row_idx, bool col_const,
@@ -96,6 +94,11 @@ public:
                                std::vector<StringRef>& buffer_list) const 
override {
         return Status::NotSupported("write_column_to_orc with type " + 
column.get_name());
     }
+
+private:
+    template <bool is_binary_format>
+    Status _write_column_to_mysql(const IColumn& column, 
MysqlRowBuffer<is_binary_format>& result,
+                                  int row_idx, bool col_const, const 
FormatOptions& options) const;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 8f3163c36c4..9a10ba8cf35 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -445,7 +445,7 @@ Status NewOlapScanner::_init_variant_columns() {
             }
         }
     }
-    schema_util::inherit_root_attributes(tablet_schema);
+    schema_util::inherit_column_attributes(tablet_schema);
     return Status::OK();
 }
 
diff --git a/be/src/vec/functions/function_variant_element.cpp 
b/be/src/vec/functions/function_variant_element.cpp
index 89256635279..84ddc3b8046 100644
--- a/be/src/vec/functions/function_variant_element.cpp
+++ b/be/src/vec/functions/function_variant_element.cpp
@@ -32,6 +32,7 @@
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_object.h"
 #include "vec/columns/column_string.h"
+#include "vec/columns/subcolumn_tree.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h"
@@ -43,6 +44,7 @@
 #include "vec/functions/function.h"
 #include "vec/functions/function_helpers.h"
 #include "vec/functions/simple_function_factory.h"
+#include "vec/json/path_in_data.h"
 
 namespace doris::vectorized {
 
@@ -128,8 +130,45 @@ private:
             *result = ColumnObject::create(true, type, 
std::move(result_column));
             return Status::OK();
         } else {
-            return Status::NotSupported("Not support element_at with none 
scalar variant {}",
-                                        src.debug_string());
+            auto mutable_src = src.clone_finalized();
+            auto* mutable_ptr = assert_cast<ColumnObject*>(mutable_src.get());
+            PathInData path(field_name);
+            ColumnObject::Subcolumns subcolumns = 
mutable_ptr->get_subcolumns();
+            const auto* node = subcolumns.find_exact(path);
+            auto result_col = ColumnObject::create(true, false /*should not 
create root*/);
+            if (node != nullptr) {
+                std::vector<decltype(node)> nodes;
+                PathsInData paths;
+                ColumnObject::Subcolumns::get_leaves_of_node(node, nodes, 
paths);
+                ColumnObject::Subcolumns new_subcolumns;
+                for (const auto* n : nodes) {
+                    PathInData new_path = n->path.copy_pop_front();
+                    VLOG_DEBUG << "add node " << new_path.get_path()
+                               << ", data size: " << n->data.size()
+                               << ", finalized size: " << 
n->data.get_finalized_column().size()
+                               << ", common type: " << 
n->data.get_least_common_type()->get_name();
+                    // if new_path is empty, indicate it's the root column, 
but adding a root will return false when calling add
+                    if (!new_subcolumns.add(new_path, n->data)) {
+                        VLOG_DEBUG << "failed to add node " << 
new_path.get_path();
+                    }
+                }
+                // handle the root node
+                if (new_subcolumns.empty() && !nodes.empty()) {
+                    CHECK_EQ(nodes.size(), 1);
+                    new_subcolumns.create_root(ColumnObject::Subcolumn {
+                            
nodes[0]->data.get_finalized_column_ptr()->assume_mutable(),
+                            nodes[0]->data.get_least_common_type(), true, 
true});
+                }
+                auto container = 
ColumnObject::create(std::move(new_subcolumns), true);
+                result_col->insert_range_from(*container, 0, 
container->size());
+            } else {
+                result_col->insert_many_defaults(src.size());
+            }
+            *result = result_col->get_ptr();
+            VLOG_DEBUG << "dump new object "
+                       << static_cast<const 
ColumnObject*>(result_col.get())->debug_string()
+                       << ", path " << path.get_path();
+            return Status::OK();
         }
     }
 
diff --git a/be/src/vec/olap/olap_data_convertor.cpp 
b/be/src/vec/olap/olap_data_convertor.cpp
index 86c1d2d6669..e5f945953f6 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/olap/olap_data_convertor.h"
 
+#include <memory>
 #include <new>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
@@ -42,6 +43,7 @@
 #include "vec/columns/column_struct.h"
 #include "vec/columns/column_vector.h"
 #include "vec/common/assert_cast.h"
+#include "vec/common/schema_util.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_agg_state.h"
 #include "vec/data_types/data_type_array.h"
@@ -214,6 +216,16 @@ void OlapBlockDataConvertor::set_source_content(const 
vectorized::Block* block,
     }
 }
 
+Status OlapBlockDataConvertor::set_source_content_with_specifid_column(
+        const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t 
num_rows, uint32_t cid) {
+    DCHECK(num_rows > 0);
+    DCHECK(row_pos + num_rows <= typed_column.column->size());
+    DCHECK(cid < _convertors.size());
+    RETURN_IF_CATCH_EXCEPTION(
+            { _convertors[cid]->set_source_column(typed_column, row_pos, 
num_rows); });
+    return Status::OK();
+}
+
 Status OlapBlockDataConvertor::set_source_content_with_specifid_columns(
         const vectorized::Block* block, size_t row_pos, size_t num_rows,
         std::vector<uint32_t> cids) {
@@ -1078,8 +1090,6 @@ void 
OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column(
                     ? assert_cast<const 
vectorized::ColumnObject&>(*typed_column.column)
                     : assert_cast<const vectorized::ColumnObject&>(
                               nullable_column->get_nested_column());
-
-    const_cast<ColumnObject&>(variant).finalize_if_not();
     if (variant.is_null_root()) {
         auto root_type = 
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
         auto root_col = root_type->create_column();
@@ -1087,19 +1097,25 @@ void 
OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column(
         const_cast<ColumnObject&>(variant).create_root(root_type, 
std::move(root_col));
         variant.check_consistency();
     }
-    auto root_of_variant = variant.get_root();
-    auto nullable = assert_cast<const ColumnNullable*>(root_of_variant.get());
-    CHECK(nullable);
-    _root_data_column = assert_cast<const 
ColumnString*>(&nullable->get_nested_column());
-    _root_data_convertor->set_source_column({root_of_variant->get_ptr(), 
nullptr, ""}, row_pos,
-                                            num_rows);
+    // ensure data finalized
+    _source_column_ptr = &const_cast<ColumnObject&>(variant);
+    _source_column_ptr->finalize(false);
+    _root_data_convertor = 
std::make_unique<OlapColumnDataConvertorVarChar>(true);
+    _root_data_convertor->set_source_column(
+            {_source_column_ptr->get_root()->get_ptr(), nullptr, ""}, row_pos, 
num_rows);
     
OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column,
 row_pos,
                                                                            
num_rows);
 }
 
 // convert root data
 Status 
OlapBlockDataConvertor::OlapColumnDataConvertorVariant::convert_to_olap() {
-    RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, 
_root_data_column));
+    
RETURN_IF_ERROR(vectorized::schema_util::encode_variant_sparse_subcolumns(*_source_column_ptr));
+#ifndef NDEBUG
+    _source_column_ptr->check_consistency();
+#endif
+    const auto* nullable = assert_cast<const 
ColumnNullable*>(_source_column_ptr->get_root().get());
+    const auto* root_column = assert_cast<const 
ColumnString*>(&nullable->get_nested_column());
+    RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, 
root_column));
     return Status::OK();
 }
 
diff --git a/be/src/vec/olap/olap_data_convertor.h 
b/be/src/vec/olap/olap_data_convertor.h
index 0ec720fcdc1..764a7a4a7c3 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -34,6 +34,7 @@
 #include "olap/uint24.h"
 #include "runtime/collection_value.h"
 #include "util/slice.h"
+#include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_object.h"
 #include "vec/columns/column_string.h"
@@ -77,6 +78,9 @@ public:
     void set_source_content(const vectorized::Block* block, size_t row_pos, 
size_t num_rows);
     Status set_source_content_with_specifid_columns(const vectorized::Block* 
block, size_t row_pos,
                                                     size_t num_rows, 
std::vector<uint32_t> cids);
+    Status set_source_content_with_specifid_column(const 
ColumnWithTypeAndName& typed_column,
+                                                   size_t row_pos, size_t 
num_rows, uint32_t cid);
+
     void clear_source_content();
     std::pair<Status, IOlapColumnDataAccessor*> convert_column_data(size_t 
cid);
     void add_column_data_convertor(const TabletColumn& column);
@@ -487,8 +491,8 @@ private:
 
     class OlapColumnDataConvertorVariant : public OlapColumnDataConvertorBase {
     public:
-        OlapColumnDataConvertorVariant()
-                : 
_root_data_convertor(std::make_unique<OlapColumnDataConvertorVarChar>(true)) {}
+        OlapColumnDataConvertorVariant() = default;
+
         void set_source_column(const ColumnWithTypeAndName& typed_column, 
size_t row_pos,
                                size_t num_rows) override;
         Status convert_to_olap() override;
@@ -497,10 +501,11 @@ private:
         const void* get_data_at(size_t offset) const override;
 
     private:
-        // encodes sparsed columns
-        const ColumnString* _root_data_column;
-        // _nullmap contains null info for this variant
+        // // encodes sparsed columns
+        // const ColumnString* _root_data_column;
+        // // _nullmap contains null info for this variant
         std::unique_ptr<OlapColumnDataConvertorVarChar> _root_data_convertor;
+        ColumnObject* _source_column_ptr;
     };
 
 private:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
index 0a8dbf5bad9..fda46d13fff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
@@ -191,12 +191,8 @@ public class UpdateStmt extends DdlStmt {
 
         // step3: generate select list and insert column name list in insert 
stmt
         boolean isMow = ((OlapTable) 
targetTable).getEnableUniqueKeyMergeOnWrite();
-        boolean hasVariant = false;
         int setExprCnt = 0;
         for (Column column : targetTable.getColumns()) {
-            if (column.getType().isVariantType()) {
-                hasVariant = true;
-            }
             for (BinaryPredicate setExpr : setExprs) {
                 Expr lhs = setExpr.getChild(0);
                 if (((SlotRef) lhs).getColumn().equals(column)) {
@@ -204,13 +200,10 @@ public class UpdateStmt extends DdlStmt {
                 }
             }
         }
-        // 1.table with sequence col cannot use partial update cause in MOW, 
we encode pk
+        // table with sequence col cannot use partial update cause in MOW, we 
encode pk
         // with seq column but we don't know which column is sequence in update
-        // 2. variant column update schema during load, so implement partial 
update is complicated,
-        //  just ignore it at present
         if (isMow && ((OlapTable) targetTable).getSequenceCol() == null
-                && setExprCnt <= targetTable.getColumns().size() * 3 / 10
-                && !hasVariant) {
+                && setExprCnt <= targetTable.getColumns().size() * 3 / 10) {
             isPartialUpdate = true;
         }
         Optional<Column> sequenceMapCol = Optional.empty();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
index 48766caa5ce..542dab31a01 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
@@ -159,7 +159,7 @@ public class UpdateCommand extends Command implements 
ForwardWithSync, Explainab
 
         boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite()
                 && selectItems.size() < targetTable.getColumns().size()
-                && !targetTable.hasVariantColumns() && 
targetTable.getSequenceCol() == null
+                && targetTable.getSequenceCol() == null
                 && partialUpdateColNameToExpression.size() <= 
targetTable.getFullSchema().size() * 3 / 10;
 
         List<String> partialUpdateColNames = new ArrayList<>();
diff --git a/regression-test/data/variant_p0/delete_update.out 
b/regression-test/data/variant_p0/delete_update.out
index 0e478bdeb0d..4390610c21d 100644
--- a/regression-test/data/variant_p0/delete_update.out
+++ b/regression-test/data/variant_p0/delete_update.out
@@ -7,10 +7,17 @@
 
 -- !sql --
 2      {"updated_value":123}   {"updated_value":123}
-6      {"a":4,"b":[4],"c":4.0} {"updated_value" : 123}
+6      {"a":4,"b":[4],"c":4.1} {"updated_value" : 123}
 7      {"updated_value":1111}  yyy
 
 -- !sql --
 2      {"updated_value":123}   {"updated_value":123}
-6      {"a":4,"b":[4],"c":4.0} {"updated_value" : 123}
+6      {"a":4,"b":[4],"c":4.1} {"updated_value" : 123}
+
+-- !sql --
+1      "ddddddddddd"   1111    199     10      {"new_data1":1}
+2      "eeeeee"        2222    299     20      {"new_data2":2}
+3      "aaaaa" 3333    399     30      {"new_data3":3}
+4      "bbbbbbbb"      4444    499     40      {"new_data4":4}
+5      "cccccccccccc"  5555    599     50      {"new_data5":5}
 
diff --git a/regression-test/data/variant_p0/partial_update_parallel1.csv 
b/regression-test/data/variant_p0/partial_update_parallel1.csv
new file mode 100644
index 00000000000..4ba84bb7785
--- /dev/null
+++ b/regression-test/data/variant_p0/partial_update_parallel1.csv
@@ -0,0 +1,5 @@
+1,"ddddddddddd"
+2,"eeeeee"
+3,"aaaaa"
+4,"bbbbbbbb"
+5,"cccccccccccc"
diff --git a/regression-test/data/variant_p0/partial_update_parallel2.csv 
b/regression-test/data/variant_p0/partial_update_parallel2.csv
new file mode 100644
index 00000000000..1560d6d3261
--- /dev/null
+++ b/regression-test/data/variant_p0/partial_update_parallel2.csv
@@ -0,0 +1,5 @@
+1,1111,199
+2,2222,299
+3,3333,399
+4,4444,499
+5,5555,599
diff --git a/regression-test/data/variant_p0/partial_update_parallel3.csv 
b/regression-test/data/variant_p0/partial_update_parallel3.csv
new file mode 100644
index 00000000000..17abeef1a9c
--- /dev/null
+++ b/regression-test/data/variant_p0/partial_update_parallel3.csv
@@ -0,0 +1,5 @@
+1,10,{"new_data1" : 1}
+2,20,{"new_data2" : 2}
+3,30,{"new_data3" : 3}
+4,40,{"new_data4" : 4}
+5,50,{"new_data5" : 5}
diff --git a/regression-test/data/variant_p0/partial_update_parallel4.csv 
b/regression-test/data/variant_p0/partial_update_parallel4.csv
new file mode 100644
index 00000000000..0a7cbd412fa
--- /dev/null
+++ b/regression-test/data/variant_p0/partial_update_parallel4.csv
@@ -0,0 +1,3 @@
+1,1
+3,1
+5,1
diff --git a/regression-test/data/variant_p0/variant_with_rowstore.out 
b/regression-test/data/variant_p0/variant_with_rowstore.out
index d7d759baad3..6c34622bec8 100644
--- a/regression-test/data/variant_p0/variant_with_rowstore.out
+++ b/regression-test/data/variant_p0/variant_with_rowstore.out
@@ -23,3 +23,12 @@
 5      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
 6      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
 
+-- !point_select --
+-3     {"a":1,"b":1.5,"c":[1,2,3]}     {"a":1,"b":1.5,"c":[1,2,3]}
+
+-- !point_select --
+-2     {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}}       
{"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}}
+
+-- !point_select --
+-1     {"a":1123}      {"a":1123}
+
diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy 
b/regression-test/suites/variant_github_events_p0_new/load.groovy
index 0be0f205b69..c063ebecf26 100644
--- a/regression-test/suites/variant_github_events_p0_new/load.groovy
+++ b/regression-test/suites/variant_github_events_p0_new/load.groovy
@@ -95,6 +95,36 @@ suite("regression_test_variant_github_events_p0", 
"nonConcurrent"){
     sql """
         insert into github_events_2 select 1, cast(v["repo"]["name"] as 
string) FROM github_events;
     """
+    // insert batches of nulls
+    for(int t = 0; t <= 10; t += 1){ 
+        long k = 9223372036854775107 + t
+        sql """INSERT INTO github_events VALUES (${k}, NULL)"""
+    }
+    sql """ALTER TABLE github_events SET("bloom_filter_columns" = "v")"""
+    // wait for add bloom filter finished
+    def getJobState = { tableName ->
+         def jobStateResult = sql """  SHOW ALTER TABLE COLUMN WHERE 
IndexName='github_events' ORDER BY createtime DESC LIMIT 1 """
+         return jobStateResult[0][9]
+    }
+    int max_try_time = 200
+    while (max_try_time--){
+        String result = getJobState("github_events")
+        if (result == "FINISHED") {
+            break
+        } else {
+            sleep(2000)
+            if (max_try_time < 1){
+                assertEquals(1,2)
+            }
+        }
+    }
+    sql """ALTER TABLE github_events ADD COLUMN v2 variant DEFAULT NULL"""
+    for(int t = 0; t <= 10; t += 1){ 
+        long k = 9223372036854775107 + t
+        sql """INSERT INTO github_events VALUES (${k}, '{"aaaa" : 1234, "bbbb" 
: "11ssss"}', '{"xxxx" : 1234, "yyyy" : [1.111]}')"""
+    }
+    sql """ALTER TABLE github_events DROP COLUMN v2"""
+    sql """DELETE FROM github_events where k >= 9223372036854775107"""
 
     qt_sql_select_count """ select count(*) from github_events_2; """
 }
diff --git a/regression-test/suites/variant_p0/delete_update.groovy 
b/regression-test/suites/variant_p0/delete_update.groovy
index bbd999559b4..2b126b4c3a6 100644
--- a/regression-test/suites/variant_p0/delete_update.groovy
+++ b/regression-test/suites/variant_p0/delete_update.groovy
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
 suite("regression_test_variant_delete_and_update", "variant_type"){
     // MOR
     def table_name = "var_delete_update"
@@ -30,14 +32,14 @@ suite("regression_test_variant_delete_and_update", 
"variant_type"){
     """
     // test mor table
 
-    sql """insert into ${table_name} values (1, '{"a" : 1, "b" : [1], "c": 
1.0}')"""
-    sql """insert into ${table_name} values (2, '{"a" : 2, "b" : [1], "c": 
2.0}')"""
-    sql """insert into ${table_name} values (3, '{"a" : 3, "b" : [3], "c": 
3.0}')"""
-    sql """insert into ${table_name} values (4, '{"a" : 4, "b" : [4], "c": 
4.0}')"""
-    sql """insert into ${table_name} values (5, '{"a" : 5, "b" : [5], "c": 
5.0}')"""
+    sql """insert into ${table_name} values (1, '{"a":1,"b":[1],"c":1.0}')"""
+    sql """insert into ${table_name} values (2, '{"a":2,"b":[1],"c":2.0}')"""
+    sql """insert into ${table_name} values (3, '{"a":3,"b":[3],"c":3.0}')"""
+    sql """insert into ${table_name} values (4, '{"a":4,"b":[4],"c":4.0}')"""
+    sql """insert into ${table_name} values (5, '{"a":5,"b":[5],"c":5.0}')"""
 
     sql "delete from ${table_name} where k = 1"
-    sql """update ${table_name} set v = '{"updated_value" : 123}' where k = 
2"""
+    sql """update ${table_name} set v = '{"updated_value":123}' where k = 2"""
     qt_sql "select * from ${table_name} order by k"
 
     // MOW
@@ -46,41 +48,125 @@ suite("regression_test_variant_delete_and_update", 
"variant_type"){
     sql """
         CREATE TABLE IF NOT EXISTS ${table_name} (
             k bigint,
-            v variant,
+            v  variant,
             vs string 
         )
         UNIQUE KEY(`k`)
-        DISTRIBUTED BY HASH(k) BUCKETS 3
-        properties("replication_num" = "1", "enable_unique_key_merge_on_write" 
= "true");
+        DISTRIBUTED BY HASH(k) BUCKETS 4
+        properties("replication_num" = "1", "enable_unique_key_merge_on_write" 
= "true", "store_row_column" = "true");
     """
     sql "insert into var_delete_update_mow select k, cast(v as string), cast(v 
as string) from var_delete_update"
     sql "delete from ${table_name} where k = 1"
     sql "delete from ${table_name} where k in (select k from 
var_delete_update_mow where k in (3, 4, 5))"
 
-    sql """insert into ${table_name} values (6, '{"a" : 4, "b" : [4], "c": 
4.0}', 'xxx')"""
-    sql """insert into ${table_name} values (7, '{"a" : 4, "b" : [4], "c": 
4.0}', 'yyy')"""
-    sql """update ${table_name} set vs = '{"updated_value" : 123}' where k = 
6"""
-    sql """update ${table_name} set v = '{"updated_value" : 1111}' where k = 
7"""
-    qt_sql "select * from ${table_name} order by k"
+    sql """insert into var_delete_update_mow values (6, 
'{"a":4,"b":[4],"c":4.1}', 'xxx')"""
+    sql """insert into var_delete_update_mow values (7, 
'{"a":4,"b":[4],"c":4.1}', 'yyy')"""
+    sql """update var_delete_update_mow set vs = '{"updated_value" : 123}' 
where k = 6"""
+    sql """update var_delete_update_mow set v = '{"updated_value":1111}' where 
k = 7"""
+    qt_sql "select * from var_delete_update_mow order by k"
 
     sql """delete from ${table_name} where v = 'xxx' or vs = 'yyy'"""
     sql """delete from ${table_name} where vs = 'xxx' or vs = 'yyy'"""
     qt_sql "select * from ${table_name} order by k"
 
     // delete & insert concurrently
-
+    sql "set enable_unique_key_partial_update=true;"
+    sql "sync"
     t1 = Thread.startDaemon {
         for (int k = 1; k <= 60; k++) {
-            int x = k % 10;
-            sql """insert into ${table_name} values(${x}, '${x}', '{"k${x}" : 
${x}}')"""
+            int x = new Random().nextInt(61) % 10;
+            sql """insert into ${table_name}(k,vs) values(${x}, '{"k${x}" : 
${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : 
${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')"""
         } 
     }
     t2 = Thread.startDaemon {
         for (int k = 1; k <= 60; k++) {
-            int x = k % 10;
-            sql """delete from ${table_name} where k = ${x} """
+            int x = new Random().nextInt(61) % 10;
+            sql """insert into ${table_name}(k,v) values(${x}, '{"k${x}" : 
${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : 
${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')"""
+        } 
+    }
+    t3 = Thread.startDaemon {
+        for (int k = 1; k <= 60; k++) {
+            int x = new Random().nextInt(61) % 10;
+            sql """insert into ${table_name}(k,v) values(${x}, '{"k${x}" : 
${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : 
${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')"""
+        } 
+    }
+    t1.join()
+    t2.join()
+    t3.join()
+    sql "sync"
+
+    sql "set enable_unique_key_partial_update=false;"
+     // case 1: concurrent partial update
+    def tableName = "test_primary_key_partial_update_parallel"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+                `score` int(11) NOT NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321",
+                `var` variant NULL)
+                UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = 
"true", "store_row_column" = "true")
+    """
+
+    sql """insert into ${tableName} values
+        (2, "doris2", 2000, 223, 2, '{"id":2, 
"name":"doris2","score":2000,"test":223,"dft":2}'),
+        (1, "doris", 1000, 123, 1, '{"id":1, 
"name":"doris","score":1000,"test":123,"dft":1}'),
+        (5, "doris5", 5000, 523, 5, '{"id":5, 
"name":"doris5","score":5000,"test":523,"dft":5}'),
+        (4, "doris4", 4000, 423, 4, '{"id":4, 
"name":"doris4","score":4000,"test":423,"dft":4}'),
+        (3, "doris3", 3000, 323, 3, '{"id":3, 
"name":"doris3","score":3000,"test":323,"dft":3}');"""
+
+    t1 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,name'
+
+            file 'partial_update_parallel1.csv'
+            time 10000 // limit inflight 10s
         }
     }
+
+    t2 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,score,test'
+
+            file 'partial_update_parallel2.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
+    t3 = Thread.startDaemon {
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'id,dft,var'
+
+            file 'partial_update_parallel3.csv'
+            time 10000 // limit inflight 10s
+        }
+    }
+
     t1.join()
     t2.join()
+    t3.join()
+
+    sql "sync"
+
+    if (!isCloudMode()) {
+        qt_sql """ select * from ${tableName} order by id;"""
+    }
 }
\ No newline at end of file
diff --git 
a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy 
b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy
index 69c330fa98a..43f1048f151 100644
--- a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy
+++ b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy
@@ -85,8 +85,10 @@ suite("test_compaction_extract_root", "nonConcurrent") {
         union  all select 5, '{"a": 1123}' as json_str union all select 5, 
'{"a": 11245, "b" : 42005}' as json_str from numbers("number" = "4096") limit 
4096 ;"""
 
     // // fix cast to string tobe {}
-    qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM 
${tableName};"""
-    qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};"""
+    qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM test_t"""
+    qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM test_t"""
+    // TODO, sparse columns with v['b'] will not be merged in 
hierachical_data_reader with sparse columns
+    // qt_select_b_2 """ select v['b'] from test_t where  cast(v['b'] as 
string) != '42005' and  cast(v['b'] as string) != '42004' and  cast(v['b'] as 
string) != '42003' order by cast(v['b'] as string); """
 
     qt_select_1_bfcompact """select v['b'] from test_t where k = 0 and 
cast(v['a'] as int) = 11245;"""
 
@@ -140,8 +142,10 @@ suite("test_compaction_extract_root", "nonConcurrent") {
     }
     assert (rowCount <= 8)
     // fix cast to string tobe {}
-    qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM 
${tableName};"""
-    qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};"""
+    qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM test_t"""
+    qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM test_t"""
+    // TODO, sparse columns with v['b'] will not be merged in 
hierachical_data_reader with sparse columns
+    // qt_select_b_5 """ select v['b'] from test_t where  cast(v['b'] as 
string) != '42005' and  cast(v['b'] as string) != '42004' and  cast(v['b'] as 
string) != '42003' order by cast(v['b'] as string); """
 
     qt_select_1 """select v['b'] from test_t where k = 0 and cast(v['a'] as 
int) = 11245;"""
     set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1")
diff --git a/regression-test/suites/variant_p0/variant_with_rowstore.groovy 
b/regression-test/suites/variant_p0/variant_with_rowstore.groovy
index 58c245ee831..771f776b3e7 100644
--- a/regression-test/suites/variant_p0/variant_with_rowstore.groovy
+++ b/regression-test/suites/variant_p0/variant_with_rowstore.groovy
@@ -29,7 +29,6 @@ suite("regression_test_variant_rowstore", "variant_type"){
  
     def table_name = "var_rowstore"
     sql "DROP TABLE IF EXISTS ${table_name}"
-    set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95")
 
     sql """
             CREATE TABLE IF NOT EXISTS ${table_name} (
@@ -63,4 +62,50 @@ suite("regression_test_variant_rowstore", "variant_type"){
     """
     sql """insert into ${table_name} select k, cast(v as string), cast(v as 
string) from var_rowstore"""
     qt_sql "select * from ${table_name} order by k limit 10"
+
+    // Parse url
+    def user = context.config.jdbcUser
+    def password = context.config.jdbcPassword
+    def realDb = "regression_test_variant_p0"
+    String jdbcUrl = context.config.jdbcUrl
+    String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
+    def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
+    def sql_port
+    if (urlWithoutSchema.indexOf("/") >= 0) {
+        // e.g: jdbc:mysql://locahost:8080/?a=b
+        sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 
1, urlWithoutSchema.indexOf("/"))
+    } else {
+        // e.g: jdbc:mysql://locahost:8080
+        sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 
1)
+    }
+    // set server side prepared statement url
+    def prepare_url = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb 
+ "?&useServerPrepStmts=true"
+    table_name = "var_rs_pq"
+    sql "DROP TABLE IF EXISTS ${table_name}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${table_name} (
+                k bigint,
+                v variant,
+                v1 variant
+            )
+            UNIQUE KEY(`k`)
+            DISTRIBUTED BY HASH(k) BUCKETS 1
+            properties("replication_num" = "1", "disable_auto_compaction" = 
"false", "store_row_column" = "true", "enable_unique_key_merge_on_write" = 
"true");
+    """
+    sql """insert into ${table_name} select k, cast(v as string), cast(v as 
string) from var_rowstore"""
+    def result1 = connect(user=user, password=password, url=prepare_url) {
+        def stmt = prepareStatement "select * from var_rs_pq where k = ?"
+        assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement);
+        stmt.setInt(1, -3)
+        qe_point_select stmt
+        stmt.setInt(1, -2)
+        qe_point_select stmt
+        stmt.setInt(1, -1)
+        qe_point_select stmt
+
+        // def stmt1 = prepareStatement "select var['a'] from var_rs_pq where 
k = ?"
+        // assertEquals(stmt1.class, 
com.mysql.cj.jdbc.ServerPreparedStatement);
+        // stmt.setInt(1, -3)
+        // qe_point_select stmt
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to