eldenmoon commented on code in PR #33298:
URL: https://github.com/apache/doris/pull/33298#discussion_r1559132711


##########
cloud/src/meta-service/meta_service_schema.cpp:
##########
@@ -119,4 +128,239 @@ bool parse_schema_value(const ValueBuf& buf, 
doris::TabletSchemaCloudPB* schema)
     return buf.to_pb(schema);
 }
 
+// Map item to dictionary key, and add key to rowset meta, if it is a new one, 
generate it and increase item id
+// Need to remove dynamic parts from original RowsetMeta's TabletSchema, to 
make fdb schema kv stable
+template<typename ItemPB>
+void process_dictionary(
+    SchemaCloudDictionary& dict,
+    const google::protobuf::Map<int32_t, ItemPB>& item_dict,
+    google::protobuf::RepeatedPtrField<ItemPB>* result,
+    RowsetMetaCloudPB* rowset_meta,
+    const google::protobuf::RepeatedPtrField<ItemPB>& items,
+    const std::function<bool(const ItemPB&)>& filter,
+    const std::function<void(int32_t)>& add_dict_key_fn) {
+    if (items.empty()) {
+        return;
+    }
+    // Use deterministic method to do serialization since structure like
+    // `google::protobuf::Map`'s serialization is unstable
+    auto serialize_fn = [](const ItemPB& item) -> std::string {
+        std::string output;
+        google::protobuf::io::StringOutputStream string_output_stream(&output);
+        google::protobuf::io::CodedOutputStream 
output_stream(&string_output_stream);
+        output_stream.SetSerializationDeterministic(true);
+        item.SerializeToCodedStream(&output_stream);
+        return output;
+    };
+
+    google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+    std::unordered_map<std::string, int> reversed_dict;
+    for (const auto& [key, val] : item_dict) {
+        reversed_dict[serialize_fn(val)] = key;
+    }
+
+    for (const auto& item : items) {
+        if (filter(item)) {
+            // Filter none extended items, mainly extended columns and 
extended indexes
+            *none_ext_items.Add() = item;
+            continue;
+        }
+        const std::string serialized_key = serialize_fn(item);
+        auto it = reversed_dict.find(serialized_key);
+        if (it != reversed_dict.end()) {
+            // Add existed dict key to related dict
+            add_dict_key_fn(it->second);
+        } else {
+            // Add new dictionary key-value pair and update 
current_xxx_dict_id.
+            int64_t current_dict_id = 0;
+            if constexpr (std::is_same_v<ItemPB, ColumnPB>) {
+                current_dict_id = dict.current_column_dict_id() + 1;
+                dict.set_current_column_dict_id(current_dict_id);
+                dict.mutable_column_dict()->emplace(current_dict_id, item);
+            }
+            if constexpr (std::is_same_v<ItemPB, doris::TabletIndexPB>) {
+                current_dict_id = dict.current_index_dict_id() + 1;
+                dict.set_current_index_dict_id(current_dict_id);
+                dict.mutable_index_dict()->emplace(current_dict_id, item);
+            }
+            add_dict_key_fn(current_dict_id);
+            reversed_dict[serialized_key] = current_dict_id;
+            // LOG(INFO) << "Add dict key = " << current_dict_id << " dict 
value = " << item.ShortDebugString();
+        }
+    }
+    if (result != nullptr) {
+        result->Swap(&none_ext_items);
+    }
+}
+
+// Writes schema dictionary metadata to RowsetMetaCloudPB.
+// Schema was extended in BE side, we need to reset schema to original 
frontend schema and store
+// such restored schema in fdb. And also add extra dict key info to 
RowsetMetaCloudPB.
+std::pair<MetaServiceCode, std::string> write_schema_dict(
+            const std::string& instance_id,
+            Transaction* txn,
+            RowsetMetaCloudPB* rowset_meta) {
+    std::string msg;
+    std::stringstream ss;
+    MetaServiceCode code = MetaServiceCode::OK;
+    // wrtie dict to rowset meta and update dict
+    SchemaCloudDictionary dict;
+    std::string dict_key = meta_schema_pb_dictionary_key({instance_id, 
rowset_meta->index_id()});
+    ValueBuf dict_val;
+    auto err = cloud::get(txn, dict_key, &dict_val);
+    LOG(INFO) << "Retrieved column pb dictionary, index_id=" << 
rowset_meta->index_id()
+              << " key=" << hex(dict_key) << " error=" << err;
+    if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK) 
{
+        // Handle retrieval error.
+        ss << "Failed to retrieve column pb dictionary, instance_id=" << 
instance_id
+           << " table_id=" << rowset_meta->index_id() << " key=" << 
hex(dict_key) << " error=" << err;
+        msg = ss.str();
+        code = cast_as<ErrCategory::READ>(err);
+        return {code, msg};
+    } 
+    if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && !dict_val.to_pb(&dict)) {
+        // Handle parse error.
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        msg = fmt::format("Malformed tablet dictionary value, key={}", 
hex(dict_key));
+        return {code, msg};
+    }
+
+    // collect sparse columns and clear in parent column
+    google::protobuf::RepeatedPtrField<ColumnPB> sparse_columns;
+    for (auto& column_pb : 
*rowset_meta->mutable_tablet_schema()->mutable_column()) {
+        if (column_pb.type() == "VARIANT" && 
!column_pb.sparse_columns().empty()) {
+            // set parent_id for restore info
+            for (auto& sparse_col: *column_pb.mutable_sparse_columns()) {
+                sparse_col.set_parent_unique_id(column_pb.unique_id());
+            }
+            sparse_columns.Add(column_pb.sparse_columns().begin(), 
column_pb.sparse_columns().end());
+        }
+        column_pb.clear_sparse_columns();
+    }
+    auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
+    // handle column dict
+    auto original_column_dict_id = dict.current_column_dict_id();
+    auto column_filter = [&](const doris::ColumnPB& col) -> bool {return 
col.unique_id() >= 0;};
+    auto column_dict_adder = [&](int32_t key) { 
dict_list->add_column_dict_key_list(key); };
+    process_dictionary<doris::ColumnPB>(dict, dict.column_dict(),
+            rowset_meta->mutable_tablet_schema()->mutable_column(),
+                    rowset_meta, rowset_meta->tablet_schema().column(),
+                    column_filter, column_dict_adder);
+
+    // handle sparse column dict
+    auto sparse_column_dict_adder = [&](int32_t key) { 
dict_list->add_sparse_column_dict_key_list(key); };
+    // not filter any
+    auto sparse_column_filter = [&](const doris::ColumnPB& col) -> bool 
{return false;};
+    process_dictionary<doris::ColumnPB>(dict, dict.column_dict(),
+                    nullptr, rowset_meta, sparse_columns,
+                    sparse_column_filter, sparse_column_dict_adder);
+
+    // handle index info dict
+    auto original_index_dict_id = dict.current_index_dict_id();
+    auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool 
{return index_pb.index_suffix_name().empty();};
+    auto index_dict_adder = [&](int32_t key) { 
dict_list->add_index_info_dict_key_list(key); };
+    process_dictionary<doris::TabletIndexPB>(dict, dict.index_dict(),
+            rowset_meta->mutable_tablet_schema()->mutable_index(),
+                    rowset_meta, rowset_meta->tablet_schema().index(),
+                    index_filter, index_dict_adder);
+
+    // Write back modified dictionaries.
+    if (original_index_dict_id != dict.current_index_dict_id()
+            || original_column_dict_id != dict.current_column_dict_id()) {
+        // If dictionary was modified, serialize and save it.
+        std::string dict_key = meta_schema_pb_dictionary_key({instance_id, 
rowset_meta->index_id()});
+        std::string dict_val;
+        if (!dict.SerializeToString(&dict_val)) {
+            // Handle serialization error.
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            ss << "Failed to serialize dictionary for saving, txn_id=" << 
rowset_meta->txn_id();
+            msg = ss.str();
+            return {code, msg};
+        }
+        // Limit the size of dict value
+        if (dict_val.size() > config::schema_dict_kv_size_limit) {
+            code = MetaServiceCode::KV_TXN_COMMIT_ERR;
+            ss << "Failed to write dictionary for saving, txn_id=" << 
rowset_meta->txn_id()
+                << ", reached the limited size threshold of SchemaDictKeyList 
" << config::schema_dict_kv_size_limit;
+            msg = ss.str();
+        }
+        // splitting large values (>90*1000) into multiple KVs
+        cloud::put(txn, dict_key, dict_val, 0);
+        LOG(INFO) << "Dictionary saved, key=" << hex(dict_key) << " txn_id=" 
<< rowset_meta->txn_id()
+                  << " Dict size=" << dict.column_dict_size()
+                  << ", Current column ID=" << dict.current_column_dict_id()
+                  << ", Current index ID=" << dict.current_index_dict_id();
+    }
+    return {code, msg};
+}
+
+std::pair<MetaServiceCode, std::string> read_schema_from_dict(
+            const std::string& instance_id,
+            int64_t index_id,
+            Transaction* txn,
+            google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>* 
rowset_metas) {
+    std::string msg;
+    std::stringstream ss;
+    MetaServiceCode code = MetaServiceCode::OK;
+
+    // read dict if any rowset has dict key list
+    SchemaCloudDictionary dict;
+    std::string column_dict_key = meta_schema_pb_dictionary_key({instance_id, 
index_id});
+    ValueBuf dict_val;
+    auto err = cloud::get(txn, column_dict_key, &dict_val);
+    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "internal error, failed to get dict ret=" << err;
+        msg = ss.str();
+        return {code, msg};
+    }
+    if (err == TxnErrorCode::TXN_OK && !dict_val.to_pb(&dict)) [[unlikely]] {

Review Comment:
   the dict key list and dict itself will be ignored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to