xiaokang commented on code in PR #24554:
URL: https://github.com/apache/doris/pull/24554#discussion_r1336778138


##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -759,76 +766,130 @@ Status 
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
     return Status::OK();
 }
 
-Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
-                                                TabletSchemaSPtr& 
flush_schema) {
-    if (block.rows() == 0) {
+Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
+                                                      TabletSchemaSPtr& 
flush_schema) {
+    size_t num_rows = block.rows();
+    if (num_rows == 0) {
         return Status::OK();
     }
 
-    // Sanitize block to match exactly from the same type of frontend meta
-    vectorized::schema_util::FullBaseSchemaView schema_view;
-    schema_view.table_id = _context.tablet_schema->table_id();
-    vectorized::ColumnWithTypeAndName* variant_column =
-            block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
-    if (!variant_column) {
-        return Status::OK();
+    std::vector<int> variant_column_pos;
+    if (_context.tablet_schema->is_partial_update()) {
+        // check columns that used to do partial updates should not include 
variant
+        for (int i : _context.tablet_schema->get_update_cids()) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                return Status::InvalidArgument("Not implement partial updates 
for variant");
+            }
+        }
+    } else {
+        for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                variant_column_pos.push_back(i);
+            }
+        }
     }
-    auto base_column = variant_column->column;
-    vectorized::ColumnObject& object_column =
-            
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
-    if (object_column.empty()) {
-        block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+    if (variant_column_pos.empty()) {
         return Status::OK();
     }
-    object_column.finalize();
-    // Has extended columns
-    
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+
+    try {
+        // Parse each variant column from raw string column
+        vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos);
+        vectorized::schema_util::finalize_variant_columns(block, 
variant_column_pos,
+                                                          false /*not ingore 
sparse*/);
+        vectorized::schema_util::encode_variant_sparse_subcolumns(block, 
variant_column_pos);
+    } catch (const doris::Exception& e) {
+        // TODO more graceful, max_filter_ratio
+        LOG(WARNING) << "encounter execption " << e.to_string();
+        return Status::InternalError(e.to_string());
+    }
+
     // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
-    //  static   dynamic
-    // | ----- | ------- |
+    //     static     extracted
+    // | --------- | ----------- |
     // The static ones are original _tablet_schame columns
-    flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+    flush_schema = std::make_shared<TabletSchema>();
+    flush_schema->copy_from(*_context.tablet_schema);
     vectorized::Block flush_block(std::move(block));
-    // The dynamic ones are auto generated and extended, append them the the 
orig_block
-    for (auto& entry : object_column.get_subcolumns()) {
-        const std::string& column_name = entry->path.get_path();
-        auto column_iter = schema_view.column_name_to_column.find(column_name);
-        if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
-            // Column maybe dropped by light weight schema change DDL
-            continue;
-        }
-        TabletColumn column(column_iter->second);
-        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
-                column, column.is_nullable());
-        // Dynamic generated columns does not appear in original tablet schema
-        if (_context.tablet_schema->field_index(column.name()) < 0) {
-            flush_schema->append_column(column);
-            flush_block.insert({data_type->create_column(), data_type, 
column.name()});
+
+    // If column already exist in original tablet schema, then we pick common 
type

Review Comment:
   There is no logic for existed column in tablet schema in function 
append_column. Is it in the function call to get_least_common_schema bellow?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -759,76 +766,130 @@ Status 
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
     return Status::OK();
 }
 
-Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
-                                                TabletSchemaSPtr& 
flush_schema) {
-    if (block.rows() == 0) {
+Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
+                                                      TabletSchemaSPtr& 
flush_schema) {
+    size_t num_rows = block.rows();
+    if (num_rows == 0) {
         return Status::OK();
     }
 
-    // Sanitize block to match exactly from the same type of frontend meta
-    vectorized::schema_util::FullBaseSchemaView schema_view;
-    schema_view.table_id = _context.tablet_schema->table_id();
-    vectorized::ColumnWithTypeAndName* variant_column =
-            block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
-    if (!variant_column) {
-        return Status::OK();
+    std::vector<int> variant_column_pos;
+    if (_context.tablet_schema->is_partial_update()) {
+        // check columns that used to do partial updates should not include 
variant
+        for (int i : _context.tablet_schema->get_update_cids()) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                return Status::InvalidArgument("Not implement partial updates 
for variant");
+            }
+        }
+    } else {
+        for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                variant_column_pos.push_back(i);
+            }
+        }
     }
-    auto base_column = variant_column->column;
-    vectorized::ColumnObject& object_column =
-            
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
-    if (object_column.empty()) {
-        block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+    if (variant_column_pos.empty()) {
         return Status::OK();
     }
-    object_column.finalize();
-    // Has extended columns
-    
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+
+    try {
+        // Parse each variant column from raw string column
+        vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos);
+        vectorized::schema_util::finalize_variant_columns(block, 
variant_column_pos,
+                                                          false /*not ingore 
sparse*/);
+        vectorized::schema_util::encode_variant_sparse_subcolumns(block, 
variant_column_pos);
+    } catch (const doris::Exception& e) {
+        // TODO more graceful, max_filter_ratio
+        LOG(WARNING) << "encounter execption " << e.to_string();
+        return Status::InternalError(e.to_string());
+    }
+
     // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
-    //  static   dynamic
-    // | ----- | ------- |
+    //     static     extracted
+    // | --------- | ----------- |
     // The static ones are original _tablet_schame columns
-    flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+    flush_schema = std::make_shared<TabletSchema>();
+    flush_schema->copy_from(*_context.tablet_schema);
     vectorized::Block flush_block(std::move(block));
-    // The dynamic ones are auto generated and extended, append them the the 
orig_block
-    for (auto& entry : object_column.get_subcolumns()) {
-        const std::string& column_name = entry->path.get_path();
-        auto column_iter = schema_view.column_name_to_column.find(column_name);
-        if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
-            // Column maybe dropped by light weight schema change DDL
-            continue;
-        }
-        TabletColumn column(column_iter->second);
-        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
-                column, column.is_nullable());
-        // Dynamic generated columns does not appear in original tablet schema
-        if (_context.tablet_schema->field_index(column.name()) < 0) {
-            flush_schema->append_column(column);
-            flush_block.insert({data_type->create_column(), data_type, 
column.name()});
+
+    // If column already exist in original tablet schema, then we pick common 
type
+    // and cast column to common type, and modify tablet column to common type,
+    // otherwise it's a new column, we should add to frontend
+    auto append_column = [&](const TabletColumn& parent_variant, auto& 
column_entry_from_object) {
+        const std::string& column_name =
+                parent_variant.name_lower_case() + "." + 
column_entry_from_object->path.get_path();
+        const vectorized::DataTypePtr& final_data_type_from_object =
+                column_entry_from_object->data.get_least_common_type();
+        TabletColumn tablet_column;
+        vectorized::PathInDataBuilder full_path_builder;
+        auto full_path = 
full_path_builder.append(parent_variant.name_lower_case(), false)
+                                 
.append(column_entry_from_object->path.get_parts(), false)
+                                 .build();
+        vectorized::schema_util::get_column_by_type(
+                final_data_type_from_object, column_name, tablet_column,
+                vectorized::schema_util::ExtraInfo {.unique_id = -1,
+                                                    .parent_unique_id = 
parent_variant.unique_id(),
+                                                    .path_info = full_path});
+        flush_schema->append_column(std::move(tablet_column));
+        
flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(),
+                            final_data_type_from_object, column_name});
+    };
+
+    // 1. Flatten variant column into flat columns, append flatten columns to 
the back of original Block and TabletSchema
+    // those columns are extracted columns, leave none extracted columns 
remain in original variant column, which is
+    // JSONB format at present.
+    // 2. Collect columns that need to be added or modified when data type 
changes or new columns encountered
+    for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+        size_t variant_pos = variant_column_pos[i];
+        vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                
flush_block.get_by_position(variant_pos).column->assume_mutable_ref());
+        const TabletColumn& parent_column = 
_context.tablet_schema->columns()[variant_pos];
+        CHECK(object_column.is_finalized());

Review Comment:
   Where is object_column finalized?



##########
be/src/olap/rowset/rowset_writer_context.h:
##########
@@ -40,23 +40,24 @@ struct RowsetWriterContext {
     RowsetWriterContext()
             : tablet_id(0),
               tablet_schema_hash(0),
-              index_id(0),
               partition_id(0),
+              index_id(0),
               rowset_type(BETA_ROWSET),
               rowset_state(PREPARED),
               version(Version(0, 0)),
               txn_id(0),
               tablet_uid(0, 0),
-              segments_overlap(OVERLAP_UNKNOWN) {
+              segments_overlap(OVERLAP_UNKNOWN),
+              schema_lock(new std::mutex) {
         load_id.set_hi(0);
         load_id.set_lo(0);
     }
 
     RowsetId rowset_id;
     int64_t tablet_id;
     int64_t tablet_schema_hash;
-    int64_t index_id;
     int64_t partition_id;
+    int64_t index_id;

Review Comment:
   why change the order?



##########
be/src/olap/rowset/rowset_writer.h:
##########
@@ -151,6 +151,8 @@ class RowsetWriter {
 
     virtual int64_t segment_writer_ns() { return 0; }
 
+    virtual RowsetWriterContext& mutable_context() = 0;

Review Comment:
   Why mutable? It's only called in RowsetBuilder::commit_txn() to get 
tablet_schema for read and mutable is not necessary.



##########
be/src/olap/tablet.cpp:
##########
@@ -614,9 +614,9 @@ const RowsetSharedPtr Tablet::rowset_with_max_version() 
const {
     return iter->second;
 }
 
-RowsetMetaSharedPtr Tablet::rowset_meta_with_max_schema_version(
+TabletSchemaSPtr Tablet::tablet_schema_with_max_schema_version(

Review Comment:
   In fact, it's the merged tablet schema of all rowsets if there is variant 
column in max schema version. So the name should be merged_tablet_schema or 
like.



##########
be/src/vec/common/schema_util.cpp:
##########
@@ -129,266 +139,254 @@ bool is_conversion_required_between_integers(FieldType 
lhs, FieldType rhs) {
     return true;
 }
 
-Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, 
ColumnPtr* result,
-                   RuntimeState* state) {
-    ColumnsWithTypeAndName arguments;
-    if (WhichDataType(type->get_type_id()).is_string()) {
-        // Special handle ColumnString, since the original cast logic use 
ColumnString's first item
-        // as the name of the dest type
-        arguments = {arg, {type->create_column_const(1, type->get_name()), 
type, ""}};
-    } else {
-        arguments = {arg, {type->create_column_const_with_default_value(1), 
type, ""}};
-    }
+Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, 
ColumnPtr* result) {
+    ColumnsWithTypeAndName arguments {
+            arg, {type->create_column_const_with_default_value(1), type, 
type->get_name()}};
     auto function = SimpleFunctionFactory::instance().get_function("CAST", 
arguments, type);
+    if (!function) {
+        return Status::InternalError("Not found cast function {} to {}", 
arg.type->get_name(),
+                                     type->get_name());
+    }
     Block tmp_block {arguments};
-    // the 0 position is input argument, the 1 position is to type argument, 
the 2 position is result argument
     vectorized::ColumnNumbers argnum;
     argnum.emplace_back(0);
     argnum.emplace_back(1);
     size_t result_column = tmp_block.columns();
+    auto ctx = FunctionContext::create_context(nullptr, {}, {});
+    // We convert column string to jsonb type just add a string jsonb field to 
dst column instead of parse
+    // each line in original string column.
+    ctx->set_string_as_jsonb_string(true);
     tmp_block.insert({nullptr, type, arg.name});
-    auto need_state_only = FunctionContext::create_context(state, {}, {});
-    RETURN_IF_ERROR(function->execute(need_state_only.get(), tmp_block, 
argnum, result_column,
-                                      arg.column->size()));
+    RETURN_IF_ERROR(
+            function->execute(ctx.get(), tmp_block, argnum, result_column, 
arg.column->size()));
     *result = std::move(tmp_block.get_by_position(result_column).column);
+    // Variant column is a really special case, src type is nullable but dst 
variant type is none nullable,
+    // but we still need to wrap nullmap into variant root column to prevent 
from nullable info lost.
+    // TODO rethink and better handle this sepecial situation
+    if (arg.type->is_nullable() && WhichDataType(type).is_variant_type()) {
+        auto variant = ColumnObject::create(true);
+        auto& old_variant = assert_cast<const 
ColumnObject&>(*(*result)->assume_mutable());
+        DCHECK(!old_variant.get_root()->is_nullable());
+        auto nullable = ColumnNullable::create(
+                old_variant.get_root(),
+                assert_cast<const 
ColumnNullable&>(*arg.column).get_null_map_column_ptr());
+        variant->create_root(make_nullable(arg.type), 
nullable->assume_mutable());
+        *result = std::move(variant);
+    }
     return Status::OK();
 }
 
-static void get_column_def(const vectorized::DataTypePtr& data_type, const 
std::string& name,
-                           TColumnDef* column) {
-    if (!name.empty()) {
-        column->columnDesc.__set_columnName(name);
+void get_column_by_type(const vectorized::DataTypePtr& data_type, const 
std::string& name,
+                        TabletColumn& column, const ExtraInfo& ext_info) {
+    column.set_name(name);
+    column.set_type(data_type->get_type_as_field_type());
+    if (ext_info.unique_id >= 0) {
+        column.set_unique_id(ext_info.unique_id);
+    }
+    if (ext_info.parent_unique_id >= 0) {
+        column.set_parent_unique_id(ext_info.parent_unique_id);
+    }
+    if (!ext_info.path_info.empty()) {
+        column.set_path_info(ext_info.path_info);
     }
     if (data_type->is_nullable()) {
         const auto& real_type = static_cast<const 
DataTypeNullable&>(*data_type);
-        column->columnDesc.__set_isAllowNull(true);
-        get_column_def(real_type.get_nested_type(), "", column);
+        column.set_is_nullable(true);
+        get_column_by_type(real_type.get_nested_type(), name, column, {});
         return;
     }
-    
column->columnDesc.__set_columnType(data_type->get_type_as_tprimitive_type());
     if (data_type->get_type_id() == TypeIndex::Array) {
-        TColumnDef child;
-        column->columnDesc.__set_children({});
-        get_column_def(assert_cast<const 
DataTypeArray*>(data_type.get())->get_nested_type(), "",
-                       &child);
-        column->columnDesc.columnLength =
-                
TabletColumn::get_field_length_by_type(column->columnDesc.columnType, 0);
-        column->columnDesc.children.push_back(child.columnDesc);
+        TabletColumn child;
+        get_column_by_type(assert_cast<const 
DataTypeArray*>(data_type.get())->get_nested_type(),
+                           "", child, {});
+        column.set_length(TabletColumn::get_field_length_by_type(
+                data_type->get_type_as_tprimitive_type(), 0));
+        column.add_sub_column(child);
+        column.set_default_value("[]");
         return;
     }
-    if (data_type->get_type_id() == TypeIndex::Tuple) {
-        // TODO
-        // auto tuple_type = assert_cast<const 
DataTypeTuple*>(data_type.get());
-        // DCHECK_EQ(tuple_type->get_elements().size(), 
tuple_type->get_element_names().size());
-        // for (size_t i = 0; i < tuple_type->get_elements().size(); ++i) {
-        //     TColumnDef child;
-        //     get_column_def(tuple_type->get_element(i), 
tuple_type->get_element_names()[i], &child);
-        //     column->columnDesc.children.push_back(child.columnDesc);
-        // }
-        // return;
-    }
-    if (data_type->get_type_id() == TypeIndex::String) {
+    // size is not fixed when type is string or json
+    if (WhichDataType(*data_type).is_string() || 
WhichDataType(*data_type).is_json()) {
         return;
     }
     if (WhichDataType(*data_type).is_simple()) {
-        
column->columnDesc.__set_columnLength(data_type->get_size_of_value_in_memory());
+        column.set_length(data_type->get_size_of_value_in_memory());
         return;
     }
+    // TODO handle more types like struct/date/datetime/decimal...
+    __builtin_unreachable();
 }
 
-// send an empty add columns rpc, the rpc response will fill with base schema 
info
-// maybe we could seperate this rpc from add columns rpc
-Status send_fetch_full_base_schema_view_rpc(FullBaseSchemaView* schema_view) {
-    TAddColumnsRequest req;
-    TAddColumnsResult res;
-    TTabletInfo tablet_info;
-    req.__set_table_name(schema_view->table_name);
-    req.__set_db_name(schema_view->db_name);
-    req.__set_table_id(schema_view->table_id);
-    // Set empty columns
-    req.__set_addColumns({});
-    auto master_addr = ExecEnv::GetInstance()->master_info()->network_address;
-    Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_addr.hostname, master_addr.port,
-            [&req, &res](FrontendServiceConnection& client) { 
client->addColumns(res, req); },
-            config::txn_commit_rpc_timeout_ms);
-    if (!rpc_st.ok()) {
-        return Status::InternalError("Failed to fetch schema info, encounter 
rpc failure");
-    }
-    // TODO(lhy) handle more status code
-    if (res.status.status_code != TStatusCode::OK) {
-        LOG(WARNING) << "failed to fetch schema info, code:" << 
res.status.status_code
-                     << ", msg:" << res.status.error_msgs[0];
-        return Status::InvalidArgument(
-                fmt::format("Failed to fetch schema info, {}", 
res.status.error_msgs[0]));
-    }
-    for (const auto& column : res.allColumns) {
-        schema_view->column_name_to_column[column.column_name] = column;
+TabletColumn get_least_type_column(const TabletColumn& original, const 
DataTypePtr& new_type,
+                                   const ExtraInfo& ext_info, bool* changed) {
+    TabletColumn result_column;
+    vectorized::DataTypePtr original_type = original.get_vec_type();
+    vectorized::DataTypePtr common_type;
+    vectorized::get_least_supertype<vectorized::LeastSupertypeOnError::Jsonb>(
+            vectorized::DataTypes {original_type, new_type}, &common_type);
+    if (!original_type->equals(*common_type)) {
+        // update to common type
+        *changed = true;
+        vectorized::schema_util::get_column_by_type(common_type, 
original.name(), result_column,
+                                                    ext_info);
+    } else {
+        *changed = false;
+        result_column = original;
+        result_column.set_parent_unique_id(ext_info.parent_unique_id);
+        result_column.set_unique_id(ext_info.unique_id);
+        result_column.set_path_info(ext_info.path_info);
     }
-    schema_view->schema_version = res.schema_version;
-    return Status::OK();
+    return result_column;
 }
 
-static const std::regex COLUMN_NAME_REGEX(
-        "^[_a-zA-Z@0-9\\s<>/][.a-zA-Z0-9_+-/><?@#$%^&*\"\\s,:]{0,255}$");
-
-// Do batch add columns schema change
-// only the base table supported
-Status send_add_columns_rpc(ColumnsWithTypeAndName column_type_names,
-                            FullBaseSchemaView* schema_view) {
-    if (column_type_names.empty()) {
-        return Status::OK();
+void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+                                TabletSchemaSPtr& common_schema, int32_t 
variant_col_unique_id) {
+    // Types of subcolumns by path from all tuples.
+    std::unordered_map<PathInData, DataTypes, PathInData::Hash> 
subcolumns_types;
+    for (const TabletSchemaSPtr& schema : schemas) {
+        for (const TabletColumn& col : schema->columns()) {
+            // Get subcolumns of this variant
+            if (!col.path_info().empty() && col.parent_unique_id() > 0 &&
+                col.parent_unique_id() == variant_col_unique_id) {
+                subcolumns_types[col.path_info()].push_back(
+                        DataTypeFactory::instance().create_data_type(col, 
col.is_nullable()));
+            }
+        }
     }
-    TAddColumnsRequest req;
-    TAddColumnsResult res;
-    TTabletInfo tablet_info;
-    req.__set_table_name(schema_view->table_name);
-    req.__set_db_name(schema_view->db_name);
-    req.__set_table_id(schema_view->table_id);
-    // TODO(lhy) more configurable
-    req.__set_allow_type_conflict(true);
-    req.__set_addColumns({});
-    // Deduplicate Column like `Level` and `level`
-    // TODO we will implement new version of dynamic column soon to handle 
this issue,
-    // also ignore column missmatch with regex
-    std::set<std::string> dedup;
-    for (const auto& column_type_name : column_type_names) {
-        if (dedup.contains(to_lower(column_type_name.name))) {
+    PathsInData tuple_paths;
+    DataTypes tuple_types;
+    // Get the least common type for all paths.
+    for (const auto& [key, subtypes] : subcolumns_types) {
+        assert(!subtypes.empty());
+        if (key.get_path() == ColumnObject::COLUMN_NAME_DUMMY) {
             continue;
         }
-        if (!std::regex_match(column_type_name.name, COLUMN_NAME_REGEX)) {
+        size_t first_dim = get_number_of_dimensions(*subtypes[0]);
+        tuple_paths.emplace_back(key);
+        for (size_t i = 1; i < subtypes.size(); ++i) {
+            if (first_dim != get_number_of_dimensions(*subtypes[i])) {
+                
tuple_types.emplace_back(make_nullable(std::make_shared<DataTypeJsonb>()));
+                LOG(INFO) << fmt::format(
+                        "Uncompatible types of subcolumn '{}': {} and {}, cast 
to JSONB",
+                        key.get_path(), subtypes[0]->get_name(), 
subtypes[i]->get_name());
+                break;
+            }
+        }
+        if (tuple_paths.size() == tuple_types.size()) {
             continue;
         }
-        dedup.insert(to_lower(column_type_name.name));
-        TColumnDef col;
-        get_column_def(column_type_name.type, column_type_name.name, &col);
-        req.addColumns.push_back(col);
-    }
-    auto master_addr = ExecEnv::GetInstance()->master_info()->network_address;
-    Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_addr.hostname, master_addr.port,
-            [&req, &res](FrontendServiceConnection& client) { 
client->addColumns(res, req); },
-            config::txn_commit_rpc_timeout_ms);
-    if (!rpc_st.ok()) {
-        return Status::InternalError("Failed to do schema change, rpc error");
-    }
-    // TODO(lhy) handle more status code
-    if (res.status.status_code != TStatusCode::OK) {
-        LOG(WARNING) << "failed to do schema change, code:" << 
res.status.status_code
-                     << ", msg:" << res.status.error_msgs[0];
-        return Status::InvalidArgument(
-                fmt::format("Failed to do schema change, {}", 
res.status.error_msgs[0]));
-    }
-    size_t sz = res.allColumns.size();
-    if (sz < dedup.size()) {
-        return Status::InternalError(
-                fmt::format("Unexpected result columns {}, expected at least 
{}",
-                            res.allColumns.size(), column_type_names.size()));
-    }
-    for (const auto& column : res.allColumns) {
-        schema_view->column_name_to_column[column.column_name] = column;
+        DataTypePtr common_type;
+        get_least_supertype<LeastSupertypeOnError::Jsonb>(subtypes, 
&common_type);
+        if (!common_type->is_nullable()) {
+            common_type = make_nullable(common_type);
+        }
+        tuple_types.emplace_back(common_type);
     }
-    schema_view->schema_version = res.schema_version;
-    return Status::OK();
-}
+    CHECK_EQ(tuple_paths.size(), tuple_types.size());
 
-Status unfold_object(size_t dynamic_col_position, Block& block, bool 
cast_to_original_type,
-                     RuntimeState* state) {
-    auto dynamic_col = 
block.get_by_position(dynamic_col_position).column->assume_mutable();
-    auto* column_object_ptr = assert_cast<ColumnObject*>(dynamic_col.get());
-    if (column_object_ptr->empty()) {
-        return Status::OK();
+    std::string variant_col_name = 
common_schema->column_by_uid(variant_col_unique_id).name();
+    // Append all common type columns of this variant
+    for (int i = 0; i < tuple_paths.size(); ++i) {
+        TabletColumn common_column;
+        // const std::string& column_name = variant_col_name + "." + 
tuple_paths[i].get_path();

Review Comment:
   Why not use column_name with variant_col_name prefix?



##########
be/src/olap/rowset/segment_v2/encoding_info.cpp:
##########
@@ -284,6 +284,10 @@ EncodingInfoResolver::EncodingInfoResolver() {
     _add_map<FieldType::OLAP_FIELD_TYPE_JSONB, PLAIN_ENCODING>();

Review Comment:
   maybe we should use PLAIN_ENCODING and PLAIN_ENCODING,true only for JSONB 
and VARIANT 



##########
be/src/olap/tablet_meta.cpp:
##########
@@ -363,7 +363,8 @@ void TabletMeta::init_column_from_tcolumn(uint32_t 
unique_id, const TColumn& tco
     }
     for (size_t i = 0; i < tcolumn.children_column.size(); i++) {
         ColumnPB* children_column = column->add_children_columns();
-        init_column_from_tcolumn(i, tcolumn.children_column[i], 
children_column);
+        init_column_from_tcolumn(tcolumn.children_column[i].col_unique_id,

Review Comment:
   Is it a old bug?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -759,76 +766,130 @@ Status 
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
     return Status::OK();
 }
 
-Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
-                                                TabletSchemaSPtr& 
flush_schema) {
-    if (block.rows() == 0) {
+Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
+                                                      TabletSchemaSPtr& 
flush_schema) {
+    size_t num_rows = block.rows();
+    if (num_rows == 0) {
         return Status::OK();
     }
 
-    // Sanitize block to match exactly from the same type of frontend meta
-    vectorized::schema_util::FullBaseSchemaView schema_view;
-    schema_view.table_id = _context.tablet_schema->table_id();
-    vectorized::ColumnWithTypeAndName* variant_column =
-            block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
-    if (!variant_column) {
-        return Status::OK();
+    std::vector<int> variant_column_pos;
+    if (_context.tablet_schema->is_partial_update()) {
+        // check columns that used to do partial updates should not include 
variant
+        for (int i : _context.tablet_schema->get_update_cids()) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                return Status::InvalidArgument("Not implement partial updates 
for variant");
+            }
+        }
+    } else {
+        for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                variant_column_pos.push_back(i);
+            }
+        }
     }
-    auto base_column = variant_column->column;
-    vectorized::ColumnObject& object_column =
-            
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
-    if (object_column.empty()) {
-        block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+    if (variant_column_pos.empty()) {
         return Status::OK();
     }
-    object_column.finalize();
-    // Has extended columns
-    
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+
+    try {
+        // Parse each variant column from raw string column
+        vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos);
+        vectorized::schema_util::finalize_variant_columns(block, 
variant_column_pos,
+                                                          false /*not ingore 
sparse*/);
+        vectorized::schema_util::encode_variant_sparse_subcolumns(block, 
variant_column_pos);
+    } catch (const doris::Exception& e) {
+        // TODO more graceful, max_filter_ratio
+        LOG(WARNING) << "encounter execption " << e.to_string();
+        return Status::InternalError(e.to_string());
+    }
+
     // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
-    //  static   dynamic
-    // | ----- | ------- |
+    //     static     extracted
+    // | --------- | ----------- |
     // The static ones are original _tablet_schame columns
-    flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+    flush_schema = std::make_shared<TabletSchema>();
+    flush_schema->copy_from(*_context.tablet_schema);
     vectorized::Block flush_block(std::move(block));
-    // The dynamic ones are auto generated and extended, append them the the 
orig_block
-    for (auto& entry : object_column.get_subcolumns()) {
-        const std::string& column_name = entry->path.get_path();
-        auto column_iter = schema_view.column_name_to_column.find(column_name);
-        if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
-            // Column maybe dropped by light weight schema change DDL
-            continue;
-        }
-        TabletColumn column(column_iter->second);
-        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
-                column, column.is_nullable());
-        // Dynamic generated columns does not appear in original tablet schema
-        if (_context.tablet_schema->field_index(column.name()) < 0) {
-            flush_schema->append_column(column);
-            flush_block.insert({data_type->create_column(), data_type, 
column.name()});
+
+    // If column already exist in original tablet schema, then we pick common 
type
+    // and cast column to common type, and modify tablet column to common type,
+    // otherwise it's a new column, we should add to frontend
+    auto append_column = [&](const TabletColumn& parent_variant, auto& 
column_entry_from_object) {
+        const std::string& column_name =
+                parent_variant.name_lower_case() + "." + 
column_entry_from_object->path.get_path();
+        const vectorized::DataTypePtr& final_data_type_from_object =
+                column_entry_from_object->data.get_least_common_type();
+        TabletColumn tablet_column;
+        vectorized::PathInDataBuilder full_path_builder;
+        auto full_path = 
full_path_builder.append(parent_variant.name_lower_case(), false)
+                                 
.append(column_entry_from_object->path.get_parts(), false)
+                                 .build();
+        vectorized::schema_util::get_column_by_type(
+                final_data_type_from_object, column_name, tablet_column,
+                vectorized::schema_util::ExtraInfo {.unique_id = -1,
+                                                    .parent_unique_id = 
parent_variant.unique_id(),
+                                                    .path_info = full_path});
+        flush_schema->append_column(std::move(tablet_column));
+        
flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(),
+                            final_data_type_from_object, column_name});
+    };
+
+    // 1. Flatten variant column into flat columns, append flatten columns to 
the back of original Block and TabletSchema
+    // those columns are extracted columns, leave none extracted columns 
remain in original variant column, which is
+    // JSONB format at present.
+    // 2. Collect columns that need to be added or modified when data type 
changes or new columns encountered
+    for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+        size_t variant_pos = variant_column_pos[i];
+        vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                
flush_block.get_by_position(variant_pos).column->assume_mutable_ref());
+        const TabletColumn& parent_column = 
_context.tablet_schema->columns()[variant_pos];
+        CHECK(object_column.is_finalized());
+        std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
+        for (auto& entry : object_column.get_subcolumns()) {
+            if (entry->path.empty()) {
+                // root
+                root = entry;
+                continue;
+            }
+            append_column(parent_column, entry);
         }
+        // Create new variant column and set root column
+        auto obj = vectorized::ColumnObject::create(true, false);
+        // '{}' indicates a root path
+        static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
+                {}, root->data.get_finalized_column_ptr()->assume_mutable(),
+                root->data.get_least_common_type());
+        flush_block.get_by_position(variant_pos).column = obj->get_ptr();
+        vectorized::PathInDataBuilder full_root_path_builder;
+        auto full_root_path =
+                full_root_path_builder.append(parent_column.name_lower_case(), 
false).build();
+        
flush_schema->mutable_columns()[variant_pos].set_path_info(full_root_path);
+        VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
     }
 
-    // Ensure column are all present at this schema version.Otherwise there 
will be some senario:
-    //  Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added 
columns and schema version became 10
-    //  Load2 -> version(10) with schema [a, b, c] and has no extended columns 
and fetched the schema at version 10
-    //  Load2 will persist meta with [a, b, c] but Load1 will persist meta 
with [a, b, c, d, e]
-    // So we should make sure that rowset at the same schema version alawys 
contain the same size of columns.
-    // so that all columns at schema_version is in either 
_context.tablet_schema or schema_change_recorder
-    for (const auto& [name, column] : schema_view.column_name_to_column) {
-        if (_context.tablet_schema->field_index(name) == -1) {
-            const auto& tcolumn = schema_view.column_name_to_column[name];
-            TabletColumn new_column(tcolumn);
-            _context.schema_change_recorder->add_extended_columns(column,
-                                                                  
schema_view.schema_version);
-        }
+    {
+        // Update rowset schema, tablet's tablet schema will be updated when 
build Rowset
+        // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
+        // ctx.tablet_schema:  A(bigint), B(double)
+        // => update_schema:   A(bigint), B(double), C(int), D(int)
+        std::lock_guard<std::mutex> lock(*(_context.schema_lock));

Review Comment:
   Is lock necessary? Is the _context is shared between different rowsets or 
different segments of the same rowset?



##########
be/src/olap/tablet.cpp:
##########
@@ -626,6 +626,18 @@ RowsetMetaSharedPtr 
Tablet::rowset_meta_with_max_schema_version(
                                           : 
a->tablet_schema()->schema_version() <
                                                     
b->tablet_schema()->schema_version());
             });
+    TabletSchemaSPtr target_schema = max_schema_version_rs->tablet_schema();
+    if (target_schema->num_variant_columns() > 0) {
+        // For variant columns tablet schema need to be the merged wide tablet 
schema
+        std::vector<TabletSchemaSPtr> schemas;
+        std::transform(rowset_metas.begin(), rowset_metas.end(), 
std::back_inserter(schemas),
+                       [](const RowsetMetaSharedPtr& rs_meta) { return 
rs_meta->tablet_schema(); });
+        target_schema = std::make_shared<TabletSchema>();
+        // TODO(lhy) maybe slow?
+        vectorized::schema_util::get_least_common_schema(schemas, 
target_schema);

Review Comment:
   Why merge all rowset meta if there is variant column in max version schema? 
Why not just use max version schema?



##########
be/src/olap/rowset/beta_rowset_writer_v2.h:
##########
@@ -104,6 +100,8 @@ class BetaRowsetWriterV2 : public RowsetWriter {
         return nullptr;
     }
 
+    RowsetWriterContext& mutable_context() override { LOG(FATAL) << "not 
implemented"; }

Review Comment:
   What's the difference between BetaRowsetWriterV2 and BetaRowsetWriter? And 
why not impl in BetaRowsetWriterV2?



##########
be/src/vec/olap/olap_data_convertor.cpp:
##########
@@ -587,28 +594,17 @@ const void* 
OlapBlockDataConvertor::OlapColumnDataConvertorVarChar::get_data_at(
     return null_flag ? nullptr : _slice.data() + offset;
 }
 
-Status 
OlapBlockDataConvertor::OlapColumnDataConvertorVarChar::convert_to_olap() {
-    assert(_typed_column.column);
-    const vectorized::ColumnString* column_string = nullptr;
-    if (_nullmap) {
-        auto nullable_column =
-                assert_cast<const 
vectorized::ColumnNullable*>(_typed_column.column.get());
-        column_string = assert_cast<const vectorized::ColumnString*>(
-                nullable_column->get_nested_column_ptr().get());
-    } else {
-        column_string = assert_cast<const 
vectorized::ColumnString*>(_typed_column.column.get());
-    }
-
+Status OlapBlockDataConvertor::OlapColumnDataConvertorVarChar::convert_to_olap(

Review Comment:
   Why change OlapColumnDataConvertorVarChar?



##########
be/src/olap/rowset_builder.cpp:
##########
@@ -266,6 +267,20 @@ Status RowsetBuilder::commit_txn() {
 
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_commit_txn_timer);
+
+    if (_tablet->tablet_schema()->num_variant_columns() > 0) {

Review Comment:
   Is this for memtable move forward?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to