xiaokang commented on code in PR #34925:
URL: https://github.com/apache/doris/pull/34925#discussion_r1634138098


##########
be/src/olap/base_tablet.cpp:
##########
@@ -1252,6 +1256,10 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, const Tablet
         RowsetSharedPtr transient_rowset;
         RETURN_IF_ERROR(transient_rs_writer->build(transient_rowset));
         
rowset->rowset_meta()->merge_rowset_meta(*transient_rowset->rowset_meta());

Review Comment:
   add a functin for Rowset to update rowset_meta and _schema



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();
+            const vectorized::DataTypePtr& final_data_type_from_object =
+                    entry->data.get_least_common_type();
+            vectorized::PathInDataBuilder full_path_builder;
+            auto full_path = 
full_path_builder.append(parent_column->name_lower_case(), false)
+                                     .append(entry->path.get_parts(), false)
+                                     .build();
+            return vectorized::schema_util::get_column_by_type(
+                    final_data_type_from_object, column_name,
+                    vectorized::schema_util::ExtraInfo {
+                            .unique_id = -1,
+                            .parent_unique_id = parent_column->unique_id(),
+                            .path_info = full_path});
+        };
+
+        CHECK(object_column.is_finalized());
+        // common extracted columns
+        for (const auto& entry :
+             
vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) 
{
+            if (entry->path.empty()) {
+                // already handled by parent column
+                continue;
+            }
+            CHECK(entry->data.is_finalized());
+            int current_column_id = column_id++;
+            TabletColumn tablet_column = generate_column_info(entry);
+            vectorized::schema_util::inherit_column_attributes(*parent_column, 
tablet_column,
+                                                               _flush_schema);
+            RETURN_IF_ERROR(_create_column_writer(current_column_id 
/*unused*/, tablet_column,
+                                                  _flush_schema));
+            _olap_data_convertor->set_source_content_with_specifid_column(
+                    {entry->data.get_finalized_column_ptr()->get_ptr(),
+                     entry->data.get_least_common_type(), 
tablet_column.name()},
+                    data.row_pos, data.num_rows, current_column_id);
+            // convert column data from engine format to storage layer format
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(current_column_id);
+            if (!status.ok()) {
+                return status;
+            }
+            RETURN_IF_ERROR(_column_writers[current_column_id]->append(
+                    column->get_nullmap(), column->get_data(), data.num_rows));
+            _flush_schema->append_column(tablet_column);
+            _olap_data_convertor->clear_source_content();
+        }
+        // sparse_columns
+        for (const auto& entry : 
vectorized::schema_util::get_sorted_subcolumns(
+                     object_column.get_sparse_subcolumns())) {
+            TabletColumn sparse_tablet_column = generate_column_info(entry);
+            _flush_schema->mutable_column_by_uid(parent_column->unique_id())
+                    .append_sparse_column(sparse_tablet_column);
+
+            // add sparse column to footer
+            auto* column_pb = _footer.mutable_columns(i);
+            _init_column_meta(column_pb->add_sparse_columns(), -1, 
sparse_tablet_column);
+        }
+    }
+
+    // Update rowset schema, tablet's tablet schema will be updated when build 
Rowset
+    // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
+    // ctx.tablet_schema:  A(bigint), B(double)
+    // => update_schema:   A(bigint), B(double), C(int), D(int)
+    std::lock_guard<std::mutex> lock(*(_opts.rowset_ctx->schema_lock));
+    if (_opts.rowset_ctx->merged_tablet_schema == nullptr) {
+        _opts.rowset_ctx->merged_tablet_schema = 
_opts.rowset_ctx->tablet_schema;
+    }
+    TabletSchemaSPtr update_schema;
+    RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+            {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, 
update_schema));
+    CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns())
+            << "Rowset merge schema columns count is " << 
update_schema->num_columns()
+            << ", but flush_schema is larger " << _flush_schema->num_columns()
+            << " update_schema: " << update_schema->dump_structure()
+            << " flush_schema: " << _flush_schema->dump_structure();
+    _opts.rowset_ctx->merged_tablet_schema.swap(update_schema);
+    VLOG_DEBUG << "dump block " << data.block->dump_data();
+    VLOG_DEBUG << "dump rs schema: " << 
_opts.rowset_ctx->merged_tablet_schema->dump_full_schema();
+    VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " 
<< _segment_id;
+    return Status::OK();
+}
+
 Status VerticalSegmentWriter::write_batch() {
     if (_opts.rowset_ctx->partial_update_info &&
         _opts.rowset_ctx->partial_update_info->is_partial_update &&
         _opts.write_type == DataWriteType::TYPE_DIRECT &&
         !_opts.rowset_ctx->is_transient_rowset_writer) {
         for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
-            RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid)));
+            RETURN_IF_ERROR(
+                    _create_column_writer(cid, _tablet_schema->column(cid), 
_tablet_schema));
+        }
+        vectorized::Block full_block;
+        for (auto& data : _batched_blocks) {
+            RETURN_IF_ERROR(_append_block_with_partial_content(data, 
full_block));
         }
         for (auto& data : _batched_blocks) {
-            RETURN_IF_ERROR(_append_block_with_partial_content(data));
+            RowsInBlock full_rows_block {&full_block, data.row_pos, 
data.num_rows};
+            
RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block));

Review Comment:
   Its meaning is append the block to column writer, so it's OK.



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -813,6 +920,18 @@ Status VerticalSegmentWriter::write_batch() {
         _num_rows_written += data.num_rows;
     }
 
+    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
+        for (auto& data : _batched_blocks) {
+            RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data));
+        }
+    }
+
+    for (auto& column_writer : _column_writers) {

Review Comment:
   process normal columns and variant column seperately



##########
be/src/olap/rowset/rowset.h:
##########
@@ -139,6 +139,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
     // publish rowset to make it visible to read
     void make_visible(Version version);
     void set_version(Version version);
+    void set_schema(TabletSchemaSPtr new_schema) { _schema = new_schema; }

Review Comment:
   add comment for 1.2 and later _schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to