This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e44f45aa2bd [Improve](cloud) support caching SchemaCloudDictionary in
BE side (#47629)
e44f45aa2bd is described below
commit e44f45aa2bd0dad156a3f9034a565cbf9bad4374
Author: lihangyu <[email protected]>
AuthorDate: Fri Feb 28 13:06:40 2025 +0800
[Improve](cloud) support caching SchemaCloudDictionary in BE side (#47629)
The most important changes focus on adding support for schema dictionary
caching, modifying existing methods to utilize the cache, and ensuring
schema consistency.
```
/*
* SchemaCloudDictionaryCache provides a local cache for
SchemaCloudDictionary. *
* Caching logic:
* - If the dictionary associated with a given key has not had any new
columns added
* (determined by comparing the serialized data for consistency),
* the cached dictionary is directly used to update the dictionary list
in the rowset meta
* (similar to the process_dictionary logic in write_schema_dict).
* - If new columns have been detected, the local cache is disregarded,
and the updated
* dictionary should be fetched via the meta service.
*/
```
use SchemaCloudDictionaryCache in Backend to reduce the frequency of
reading and converting SchemaCloudDictionary in MetaService
---
be/src/cloud/cloud_meta_mgr.cpp | 78 ++++++-
be/src/cloud/cloud_meta_mgr.h | 4 +
be/src/cloud/cloud_storage_engine.cpp | 5 +
be/src/cloud/cloud_storage_engine.h | 5 +
be/src/cloud/schema_cloud_dictionary_cache.cpp | 226 +++++++++++++++++++++
be/src/cloud/schema_cloud_dictionary_cache.h | 101 +++++++++
be/src/common/config.cpp | 3 +-
be/src/common/config.h | 3 +-
be/src/runtime/memory/cache_policy.h | 9 +-
.../cloud/test_schema_cloud_dictionary_cache.cpp | 178 ++++++++++++++++
cloud/src/common/bvars.cpp | 2 +-
cloud/src/common/bvars.h | 1 +
cloud/src/meta-service/meta_service.cpp | 55 ++++-
cloud/src/meta-service/meta_service.h | 10 +
cloud/src/meta-service/meta_service_schema.cpp | 6 +-
gensrc/proto/cloud.proto | 13 ++
16 files changed, 687 insertions(+), 12 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 41e60b5e264..c7a86d19905 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -40,6 +40,7 @@
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
+#include "cloud/schema_cloud_dictionary_cache.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
@@ -342,6 +343,8 @@ static std::string debug_info(const Request& req) {
req.tablet_id(), req.lock_id());
} else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
return fmt::format(" tablet_id={}", req.tablet_id());
+ } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) {
+ return fmt::format(" index_id={}", req.index_id());
} else {
static_assert(!sizeof(Request));
}
@@ -473,10 +476,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet*
tablet, bool warmup_delta_
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
- // backend side use schema dict
- if (config::variant_use_cloud_schema_dict) {
- req.set_schema_op(GetRowsetRequest::RETURN_DICT);
- }
+ // backend side use schema dict in cache if enable cloud schema dict
cache
+ req.set_schema_op(config::variant_use_cloud_schema_dict_cache
+ ? GetRowsetRequest::NO_DICT
+ : GetRowsetRequest::RETURN_DICT);
VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();
stub->get_rowset(&cntl, &req, &resp, nullptr);
@@ -592,8 +595,28 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet*
tablet, bool warmup_delta_
existed_rowset->rowset_id().to_string() ==
cloud_rs_meta_pb.rowset_id_v2()) {
continue; // Same rowset, skip it
}
- RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(
- cloud_rs_meta_pb, resp.has_schema_dict() ?
&resp.schema_dict() : nullptr);
+ RowsetMetaPB meta_pb;
+ // Check if the rowset meta contains a schema dictionary key
list.
+ if (cloud_rs_meta_pb.has_schema_dict_key_list() &&
!resp.has_schema_dict()) {
+ // Use the locally cached dictionary.
+ RowsetMetaCloudPB copied_cloud_rs_meta_pb =
cloud_rs_meta_pb;
+ CloudStorageEngine& engine =
+
ExecEnv::GetInstance()->storage_engine().to_cloud();
+ {
+ wlock.unlock();
+ RETURN_IF_ERROR(
+ engine.get_schema_cloud_dictionary_cache()
+
.replace_dict_keys_to_schema(cloud_rs_meta_pb.index_id(),
+
&copied_cloud_rs_meta_pb));
+ wlock.lock();
+ }
+ meta_pb =
cloud_rowset_meta_to_doris(copied_cloud_rs_meta_pb);
+ } else {
+ // Otherwise, use the schema dictionary from the response
(if available).
+ meta_pb = cloud_rowset_meta_to_doris(
+ cloud_rs_meta_pb,
+ resp.has_schema_dict() ? &resp.schema_dict() :
nullptr);
+ }
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->init_from_pb(meta_pb);
RowsetSharedPtr rowset;
@@ -835,6 +858,14 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta&
rs_meta,
RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(),
std::move(rs_meta_pb));
+ // Replace schema dictionary keys based on the rowset's index ID to
maintain schema consistency.
+ CloudStorageEngine& engine =
ExecEnv::GetInstance()->storage_engine().to_cloud();
+ // if not enable dict cache, then directly return true to avoid refresh
+ bool replaced =
+ config::variant_use_cloud_schema_dict_cache
+ ?
engine.get_schema_cloud_dictionary_cache().replace_schema_to_dict_keys(
+ rs_meta_pb.index_id(), req.mutable_rowset_meta())
+ : true;
Status st = retry_rpc("commit rowset", req, &resp,
&MetaService_Stub::commit_rowset);
if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
@@ -845,6 +876,13 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta&
rs_meta,
}
return Status::AlreadyExist("failed to commit rowset: {}",
resp.status().msg());
}
+ // If dictionary replacement fails, it may indicate that the local schema
dictionary is outdated.
+ // Refreshing the dictionary here ensures that the rowset metadata is
updated with the latest schema definitions,
+ // which is critical for maintaining consistency between the rowset and
its corresponding schema.
+ if (!replaced) {
+ RETURN_IF_ERROR(
+
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
+ }
return st;
}
@@ -1389,5 +1427,33 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const
RowsetMeta& rs_meta) {
return total_inverted_index_size;
}
+Status CloudMetaMgr::get_schema_dict(int64_t index_id,
+ std::shared_ptr<SchemaCloudDictionary>*
schema_dict) {
+ VLOG_DEBUG << "Sending GetSchemaDictRequest, index_id: " << index_id;
+
+ // Create the request and response objects.
+ GetSchemaDictRequest req;
+ GetSchemaDictResponse resp;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_index_id(index_id);
+
+ // Invoke RPC via the retry_rpc helper function.
+ // It will call the MetaService_Stub::get_schema_dict method.
+ Status st = retry_rpc("get schema dict", req, &resp,
&MetaService_Stub::get_schema_dict);
+ if (!st.ok()) {
+ return st;
+ }
+
+ // Optionally, additional checking of the response status can be done here.
+ // For example, if the returned status code indicates a parsing or not
found error,
+ // you may return an error accordingly.
+
+ // Copy the retrieved schema dictionary from the response.
+ *schema_dict = std::make_shared<SchemaCloudDictionary>();
+ (*schema_dict)->Swap(resp.mutable_schema_dict());
+ VLOG_DEBUG << "Successfully obtained schema dict, index_id: " << index_id;
+ return Status::OK();
+}
+
#include "common/compile_check_end.h"
} // namespace doris::cloud
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index f0a1b1a6648..d06e55e69ad 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -16,6 +16,8 @@
// under the License.
#pragma once
+#include <gen_cpp/olap_file.pb.h>
+
#include <memory>
#include <string>
#include <tuple>
@@ -58,6 +60,8 @@ public:
Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>*
tablet_meta);
+ Status get_schema_dict(int64_t index_id,
std::shared_ptr<SchemaCloudDictionary>* schema_dict);
+
Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data =
false,
bool sync_delete_bitmap = true, bool full_sync
= false);
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 574e17fbb9b..9b74bdf7343 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -19,6 +19,7 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/cloud.pb.h>
+#include <gen_cpp/olap_file.pb.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
@@ -37,6 +38,7 @@
#include "cloud/cloud_txn_delete_bitmap_cache.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
+#include "common/config.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
@@ -190,6 +192,9 @@ Status CloudStorageEngine::open() {
_tablet_hotspot = std::make_unique<TabletHotspot>();
+ _schema_cloud_dictionary_cache =
+
std::make_unique<SchemaCloudDictionaryCache>(config::schema_dict_cache_capacity);
+
RETURN_NOT_OK_STATUS_WITH_WARN(
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
"init StreamLoadRecorder failed");
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 7d8e68c5f79..f8dc58b6cd0 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -24,6 +24,7 @@
//#include "cloud/cloud_full_compaction.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_tablet.h"
+#include "cloud/schema_cloud_dictionary_cache.h"
#include "cloud_txn_delete_bitmap_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/storage_engine.h"
@@ -69,6 +70,9 @@ public:
CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; }
CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return
*_txn_delete_bitmap_cache; }
+ SchemaCloudDictionaryCache& get_schema_cloud_dictionary_cache() {
+ return *_schema_cloud_dictionary_cache;
+ }
ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
return *_calc_tablet_delete_bitmap_task_thread_pool;
}
@@ -163,6 +167,7 @@ private:
std::unique_ptr<CloudTabletMgr> _tablet_mgr;
std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
+ std::unique_ptr<SchemaCloudDictionaryCache> _schema_cloud_dictionary_cache;
// Components for cache warmup
std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader;
diff --git a/be/src/cloud/schema_cloud_dictionary_cache.cpp
b/be/src/cloud/schema_cloud_dictionary_cache.cpp
new file mode 100644
index 00000000000..25f0b232702
--- /dev/null
+++ b/be/src/cloud/schema_cloud_dictionary_cache.cpp
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/schema_cloud_dictionary_cache.h"
+
+#include <fmt/core.h>
+#include <gen_cpp/olap_file.pb.h>
+
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+#include "common/config.h"
+#include "gen_cpp/cloud.pb.h" // For GetSchemaDictResponse
+#include "runtime/exec_env.h"
+
+namespace doris {
+
+bvar::Adder<int64_t> g_schema_dict_cache_count("schema_dict_cache_count");
+bvar::Adder<int64_t> g_replace_dict_keys_to_schema_hit_cache(
+ "schema_dict_cache_replace_dict_keys_to_schema_hit_count");
+bvar::Adder<int64_t> g_replace_schema_to_dict_keys_hit_cache(
+ "schema_dict_cache_replace_schema_to_dict_keys_hit_count");
+bvar::Adder<int64_t>
g_schema_dict_cache_miss_count("schema_dict_cache_miss_count");
+bvar::Adder<int64_t> g_schema_dict_refresh_count("schema_dict_refresh_count");
+
+void SchemaCloudDictionaryCache::_insert(int64_t index_id, const
SchemaCloudDictionarySPtr& dict) {
+ auto* value = new CacheValue;
+ value->dict = dict;
+ auto* lru_handle =
+ LRUCachePolicy::insert(fmt::format("{}", index_id), value, 1, 0,
CachePriority::NORMAL);
+ g_schema_dict_cache_count << 1;
+ _cache->release(lru_handle);
+}
+
+SchemaCloudDictionarySPtr SchemaCloudDictionaryCache::_lookup(int64_t
index_id) {
+ Cache::Handle* handle = LRUCachePolicy::lookup(fmt::format("{}",
index_id));
+ if (!handle) {
+ return nullptr;
+ }
+ auto* cache_val = static_cast<CacheValue*>(_cache->value(handle));
+ SchemaCloudDictionarySPtr dict = cache_val ? cache_val->dict : nullptr;
+ _cache->release(handle); // release handle but dict's shared_ptr still
alive
+ return dict;
+}
+
+/**
+ * Processes dictionary entries by matching items from the given item map.
+ * It maps items to their dictionary keys, then adds these keys to the rowset
metadata.
+ * If an item is missing in the dictionary, the dictionary key list in rowset
meta is cleared
+ * and the function returns a NotFound status.
+ *
+ * @tparam ItemPB The protobuf message type for dictionary items (e.g.,
ColumnPB or TabletIndexPB).
+ * @param dict The SchemaCloudDictionary that holds the dictionary entries.
+ * @param item_dict A mapping from unique identifiers to the dictionary items.
+ * @param result Pointer to a repeated field where filtered (non-extended)
items are stored. May be null.
+ * @param items The repeated field of items in the original rowset meta.
+ * @param filter A predicate that returns true if an item should be treated as
an extended item and skipped.
+ * @param add_dict_key_fn A function to be called for each valid item that
adds its key to the rowset meta.
+ * @param rowset_meta Pointer to the rowset metadata; it is cleared if any
item is not found.
+ *
+ * @return Status::OK if all items are processed successfully; otherwise, a
NotFound status.
+ */
+template <typename ItemPB>
+Status process_dictionary(SchemaCloudDictionary& dict,
+ const google::protobuf::Map<int32_t, ItemPB>&
item_dict,
+ google::protobuf::RepeatedPtrField<ItemPB>* result,
+ const google::protobuf::RepeatedPtrField<ItemPB>&
items,
+ const std::function<bool(const ItemPB&)>& filter,
+ const std::function<void(int32_t)>& add_dict_key_fn,
+ RowsetMetaCloudPB* rowset_meta) {
+ if (items.empty()) {
+ return Status::OK();
+ }
+ // Use deterministic method to do serialization since structure like
+ // `google::protobuf::Map`'s serialization is unstable
+ auto serialize_fn = [](const ItemPB& item) -> std::string {
+ std::string output;
+ google::protobuf::io::StringOutputStream string_output_stream(&output);
+ google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
+ output_stream.SetSerializationDeterministic(true);
+ item.SerializeToCodedStream(&output_stream);
+ return output;
+ };
+
+ google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
+ std::unordered_map<std::string, int> reversed_dict;
+ for (const auto& [key, val] : item_dict) {
+ reversed_dict[serialize_fn(val)] = key;
+ }
+
+ for (const auto& item : items) {
+ if (filter(item)) {
+ // Filter none extended items, mainly extended columns and
extended indexes
+ *none_ext_items.Add() = item;
+ continue;
+ }
+ const std::string serialized_key = serialize_fn(item);
+ auto it = reversed_dict.find(serialized_key);
+ if (it == reversed_dict.end()) {
+ // If any required item is missing in the dictionary, clear the
dict key list and return NotFound.
+ // ATTN: need to clear dict key list let MS to add key list
+ rowset_meta->clear_schema_dict_key_list();
+ g_schema_dict_cache_miss_count << 1;
+ return Status::NotFound<false>("Not found entry in dict");
+ }
+ // Add existed dict key to related dict
+ add_dict_key_fn(it->second);
+ }
+ // clear extended items to prevent writing them to fdb
+ if (result != nullptr) {
+ result->Swap(&none_ext_items);
+ }
+ return Status::OK();
+}
+
+Status SchemaCloudDictionaryCache::replace_schema_to_dict_keys(int64_t
index_id,
+
RowsetMetaCloudPB* rowset_meta) {
+ if (!rowset_meta->has_variant_type_in_schema()) {
+ return Status::OK();
+ }
+ auto dict = _lookup(index_id);
+ if (!dict) {
+ g_schema_dict_cache_miss_count << 1;
+ return Status::NotFound<false>("Not found dict {}", index_id);
+ }
+ auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
+ // Process column dictionary: add keys for non-extended columns.
+ auto column_filter = [&](const doris::ColumnPB& col) -> bool { return
col.unique_id() >= 0; };
+ auto column_dict_adder = [&](int32_t key) {
dict_list->add_column_dict_key_list(key); };
+ RETURN_IF_ERROR(process_dictionary<ColumnPB>(
+ *dict, dict->column_dict(),
rowset_meta->mutable_tablet_schema()->mutable_column(),
+ rowset_meta->tablet_schema().column(), column_filter,
column_dict_adder, rowset_meta));
+
+ // Process index dictionary: add keys for indexes with an empty
index_suffix_name.
+ auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool {
+ return index_pb.index_suffix_name().empty();
+ };
+ auto index_dict_adder = [&](int32_t key) {
dict_list->add_index_info_dict_key_list(key); };
+ RETURN_IF_ERROR(process_dictionary<doris::TabletIndexPB>(
+ *dict, dict->index_dict(),
rowset_meta->mutable_tablet_schema()->mutable_index(),
+ rowset_meta->tablet_schema().index(), index_filter,
index_dict_adder, rowset_meta));
+ g_replace_schema_to_dict_keys_hit_cache << 1;
+ return Status::OK();
+}
+
+Status SchemaCloudDictionaryCache::_try_fill_schema(
+ const std::shared_ptr<SchemaCloudDictionary>& dict, const
SchemaDictKeyList& dict_keys,
+ TabletSchemaCloudPB* schema) {
+ // Process column dictionary keys
+ for (int key : dict_keys.column_dict_key_list()) {
+ auto it = dict->column_dict().find(key);
+ if (it == dict->column_dict().end()) {
+ return Status::NotFound<false>("Column dict key {} not found",
key);
+ }
+ *schema->add_column() = it->second;
+ }
+ // Process index dictionary keys
+ for (int key : dict_keys.index_info_dict_key_list()) {
+ auto it = dict->index_dict().find(key);
+ if (it == dict->index_dict().end()) {
+ return Status::NotFound<false>("Index dict key {} not found", key);
+ }
+ *schema->add_index() = it->second;
+ }
+ return Status::OK();
+}
+
+Status SchemaCloudDictionaryCache::refresh_dict(int64_t index_id,
+ SchemaCloudDictionarySPtr*
new_dict) {
+ // First attempt: use the current cached dictionary.
+ auto refresh_dict = std::make_shared<SchemaCloudDictionary>();
+ RETURN_IF_ERROR(static_cast<const
CloudStorageEngine&>(ExecEnv::GetInstance()->storage_engine())
+ .meta_mgr()
+ .get_schema_dict(index_id, &refresh_dict));
+ _insert(index_id, refresh_dict);
+ if (new_dict != nullptr) {
+ *new_dict = refresh_dict;
+ }
+ LOG(INFO) << "refresh dict for index_id=" << index_id;
+ g_schema_dict_refresh_count << 1;
+ return Status::OK();
+}
+
+Status SchemaCloudDictionaryCache::replace_dict_keys_to_schema(int64_t
index_id,
+
RowsetMetaCloudPB* out) {
+ // First attempt: use the current cached dictionary
+ SchemaCloudDictionarySPtr dict = _lookup(index_id);
+ Status st =
+ dict ? _try_fill_schema(dict, out->schema_dict_key_list(),
out->mutable_tablet_schema())
+ : Status::NotFound<false>("Schema dict not found in cache");
+
+ // If filling fails (possibly due to outdated dictionary data), refresh
the dictionary
+ if (!st.ok()) {
+ g_schema_dict_cache_miss_count << 1;
+ RETURN_IF_ERROR(refresh_dict(index_id, &dict));
+ if (!dict) {
+ return Status::NotFound<false>("Schema dict not found after
refresh, index_id={}",
+ index_id);
+ }
+ // Retry filling the schema with the refreshed dictionary
+ st = _try_fill_schema(dict, out->schema_dict_key_list(),
out->mutable_tablet_schema());
+ }
+ g_replace_dict_keys_to_schema_hit_cache << 1;
+ return st;
+}
+
+} // namespace doris
diff --git a/be/src/cloud/schema_cloud_dictionary_cache.h
b/be/src/cloud/schema_cloud_dictionary_cache.h
new file mode 100644
index 00000000000..ed21b7db909
--- /dev/null
+++ b/be/src/cloud/schema_cloud_dictionary_cache.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/olap_file.pb.h>
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "runtime/memory/lru_cache_policy.h"
+
+namespace doris {
+
+class SchemaCloudDictionary;
+class RowsetMetaCloudPB;
+
+using SchemaCloudDictionarySPtr = std::shared_ptr<SchemaCloudDictionary>;
+
+/*
+ * SchemaCloudDictionaryCache provides a local cache for SchemaCloudDictionary.
+ *
+ * Caching logic:
+ * - If the dictionary associated with a given key has not had any new
columns added
+ * (determined by comparing the serialized data for consistency),
+ * the cached dictionary is directly used to update the dictionary list in
the rowset meta
+ * (similar to the process_dictionary logic in write_schema_dict).
+ * - If new columns have been detected, the local cache is disregarded, and
the updated
+ * dictionary should be fetched via the meta service.
+ */
+class SchemaCloudDictionaryCache : public LRUCachePolicy {
+public:
+ SchemaCloudDictionaryCache(size_t capacity)
+ :
LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE, capacity,
+ LRUCacheType::NUMBER, 512) {}
+ /**
+ * Refreshes the dictionary for the given index_id by calling an RPC via
the meta manager.
+ * The refreshed dictionary is then inserted into the cache.
+ *
+ * @param index_id The identifier for the index.
+ * @param new_dict Optional output parameter; if provided, it will be set
to point to the refreshed dictionary.
+ *
+ * @return Status::OK if the dictionary is successfully refreshed;
otherwise, an error status.
+ */
+ virtual Status refresh_dict(int64_t index_id, SchemaCloudDictionarySPtr*
new_dict = nullptr);
+
+ /**
+ * Refreshes the dictionary for the given index_id by calling an RPC via
the meta manager.
+ * The refreshed dictionary is then inserted into the cache.
+ *
+ * @param index_id The identifier for the index.
+ * @param new_dict Optional output parameter; if provided, it will be set
to point to the refreshed dictionary.
+ *
+ * @return Status::OK if the dictionary is successfully refreshed;
otherwise, an error status.
+ */
+ Status replace_schema_to_dict_keys(int64_t index_id, RowsetMetaCloudPB*
out);
+
+ /**
+ * Replaces dictionary keys in the given RowsetMetaCloudPB by using the
cached dictionary.
+ * If the cached dictionary is missing or its data is outdated (i.e.
missing required keys),
+ * an RPC call is triggered to refresh the dictionary, which is then used
to fill the tablet schema.
+ *
+ * @param index_id The identifier for the index.
+ * @param out Pointer to the RowsetMetaCloudPB whose tablet schema will be
updated.
+ *
+ * @return Status::OK if the tablet schema is successfully updated;
otherwise, an error status.
+ */
+ Status replace_dict_keys_to_schema(int64_t index_id, RowsetMetaCloudPB*
out);
+
+private:
+ // ut
+ friend class FakeSchemaCloudDictionaryCache;
+ // insert dict
+ void _insert(int64_t index_id, const SchemaCloudDictionarySPtr& dict);
+ // lookup dict
+ SchemaCloudDictionarySPtr _lookup(int64_t index_id);
+ // Attempts to fill the tablet schema information in a
SchemaCloudDictionary into a TabletSchemaCloudPB
+ // based on a given set of dictionary keys. If any required key is missing
in the dictionary, a NotFound status is returned.
+ Status _try_fill_schema(const SchemaCloudDictionarySPtr& dict,
+ const SchemaDictKeyList& dict_keys,
TabletSchemaCloudPB* schema);
+ struct CacheValue : public LRUCacheValueBase {
+ SchemaCloudDictionarySPtr dict;
+ };
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6526061ff44..301a82336b7 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1029,7 +1029,7 @@ DEFINE_Bool(enable_workload_group_for_scan, "false");
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
// Whether use schema dict in backend side instead of MetaService side(cloud
mode)
-DEFINE_mBool(variant_use_cloud_schema_dict, "true");
+DEFINE_mBool(variant_use_cloud_schema_dict_cache, "true");
DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
@@ -1437,6 +1437,7 @@ DEFINE_mInt32(compaction_num_per_round, "1");
DEFINE_mInt32(check_tablet_delete_bitmap_interval_seconds, "300");
DEFINE_mInt32(check_tablet_delete_bitmap_score_top_n, "10");
DEFINE_mBool(enable_check_tablet_delete_bitmap_score, "true");
+DEFINE_mInt32(schema_dict_cache_capacity, "4096");
// clang-format off
#ifdef BE_TEST
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d5299393187..845488139be 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1234,7 +1234,7 @@ DECLARE_mInt64(LZ4_HC_compression_level);
// Threshold of a column as sparse column
// Notice: TEST ONLY
DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column);
-DECLARE_mBool(variant_use_cloud_schema_dict);
+DECLARE_mBool(variant_use_cloud_schema_dict_cache);
// Threshold to estimate a column is sparsed
// Notice: TEST ONLY
DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
@@ -1512,6 +1512,7 @@ DECLARE_mInt32(compaction_num_per_round);
DECLARE_mInt32(check_tablet_delete_bitmap_interval_seconds);
DECLARE_mInt32(check_tablet_delete_bitmap_score_top_n);
DECLARE_mBool(enable_check_tablet_delete_bitmap_score);
+DECLARE_mInt32(schema_dict_cache_capacity);
#ifdef BE_TEST
// test s3
diff --git a/be/src/runtime/memory/cache_policy.h
b/be/src/runtime/memory/cache_policy.h
index 72e61fed2e0..f7f4bda34ff 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -17,6 +17,8 @@
#pragma once
+#include <gen_cpp/olap_file.pb.h>
+
#include <vector>
#include "util/runtime_profile.h"
@@ -52,6 +54,7 @@ public:
FOR_UT_CACHE_NUMBER = 19,
QUERY_CACHE = 20,
TABLET_COLUMN_OBJECT_POOL = 21,
+ SCHEMA_CLOUD_DICTIONARY_CACHE = 22,
};
static std::string type_string(CacheType type) {
@@ -98,6 +101,8 @@ public:
return "QueryCache";
case CacheType::TABLET_COLUMN_OBJECT_POOL:
return "TabletColumnObjectPool";
+ case CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE:
+ return "SchemaCloudDictionaryCache";
default:
throw Exception(Status::FatalError("not match type of cache policy
:{}",
static_cast<int>(type)));
@@ -126,7 +131,9 @@ public:
{"CloudTxnDeleteBitmapCache",
CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE},
{"ForUTCacheNumber", CacheType::FOR_UT_CACHE_NUMBER},
{"QueryCache", CacheType::QUERY_CACHE},
- {"TabletColumnObjectPool", CacheType::TABLET_COLUMN_OBJECT_POOL}};
+ {"TabletColumnObjectPool", CacheType::TABLET_COLUMN_OBJECT_POOL},
+ {"SchemaCloudDictionaryCache",
CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE},
+ };
static CacheType string_to_type(std::string type) {
if (StringToType.contains(type)) {
diff --git a/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
b/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
new file mode 100644
index 00000000000..3d05eb67e45
--- /dev/null
+++ b/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/schema_cloud_dictionary_cache.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "gtest/gtest.h"
+
+namespace doris {
+
+using SchemaCloudDictionarySPtr = std::shared_ptr<SchemaCloudDictionary>;
+
+/*
+ * FakeSchemaCloudDictionaryCache is a test subclass which allows injection of
dictionary entries
+ * and overrides refresh_dict to simulate RPC refresh.
+ */
+class FakeSchemaCloudDictionaryCache : public SchemaCloudDictionaryCache {
+public:
+ FakeSchemaCloudDictionaryCache(size_t capacity) :
SchemaCloudDictionaryCache(capacity) {}
+
+ // For unit testing, we override refresh_dict to simulate different
scenarios.
+ // (Assume the base class method is declared virtual for testing or we
hide it in our subclass)
+ Status refresh_dict(int64_t index_id, SchemaCloudDictionarySPtr* new_dict
= nullptr) override {
+ if (simulate_refresh_success) {
+ // Simulate a successful refresh by creating a valid dictionary.
+ SchemaCloudDictionarySPtr valid_dict = createValidDictionary();
+ // Inject the dictionary into cache.
+ TestInsert(index_id, valid_dict);
+ if (new_dict) {
+ *new_dict = valid_dict;
+ }
+ return Status::OK();
+ } else {
+ return Status::InternalError("Simulated refresh failure");
+ }
+ }
+
+ // Public wrapper for injection (assume _insert is accessible, e.g.
changed to protected for unit test)
+ void TestInsert(int64_t index_id, const SchemaCloudDictionarySPtr& dict) {
+ _insert(index_id, dict);
+ }
+
+ // Flag to control refresh_dict to simulate refresh results.
+ bool simulate_refresh_success = true;
+
+ // Create a valid SchemaCloudDictionary with expected keys.
+ static SchemaCloudDictionarySPtr createValidDictionary() {
+ auto* dict = new SchemaCloudDictionary();
+ // Populate valid column entry with key 101.
+ auto& col_dict = *dict->mutable_column_dict();
+ ColumnPB* col_pb = &(col_dict)[101];
+ col_pb->set_unique_id(101);
+ // Populate valid index entry with key 201. Set index_suffix_name to
empty.
+ auto& idx_dict = *dict->mutable_index_dict();
+ TabletIndexPB* idx_pb = &(idx_dict)[201];
+ idx_pb->set_index_suffix_name("");
+ return SchemaCloudDictionarySPtr(dict);
+ }
+
+ // Create an invalid SchemaCloudDictionary (missing column key 101)
+ static SchemaCloudDictionarySPtr createInvalidDictionary() {
+ auto* dict = new SchemaCloudDictionary();
+ // Insert a column with a wrong key example 999 rather than 101.
+ auto& col_dict = *dict->mutable_column_dict();
+ ColumnPB* col_pb = &(col_dict)[999];
+ col_pb->set_unique_id(999);
+ // 正常的 index 数据.
+ auto& idx_dict = *dict->mutable_index_dict();
+ TabletIndexPB* idx_pb = &(idx_dict)[201];
+ idx_pb->set_index_suffix_name("");
+ return SchemaCloudDictionarySPtr(dict);
+ }
+};
+
+// Test case 1: Cached dictionary valid, _try_fill_schema returns OK.
+TEST(SchemaCloudDictionaryCacheTest, ReplaceDictKeysToSchema_ValidCache) {
+ int64_t index_id = 100;
+ FakeSchemaCloudDictionaryCache cache(10);
+ // Inject a valid dictionary into cache.
+ SchemaCloudDictionarySPtr valid_dict =
FakeSchemaCloudDictionaryCache::createValidDictionary();
+ cache.TestInsert(index_id, valid_dict);
+
+ // Create a RowsetMetaCloudPB with schema dictionary key list.
+ RowsetMetaCloudPB rs_meta;
+ // For testing, add expected column key (101) and index key (201).
+ SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list();
+ dict_keys->add_column_dict_key_list(101);
+ dict_keys->add_index_info_dict_key_list(201);
+ // Ensure tablet schema message is created.
+ rs_meta.mutable_tablet_schema();
+
+ // Call replace_dict_keys_to_schema.
+ Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta);
+ EXPECT_TRUE(st.ok());
+
+ // Check that the tablet schema was filled.
+ const TabletSchemaCloudPB& schema = rs_meta.tablet_schema();
+ EXPECT_EQ(schema.column_size(), 1);
+ EXPECT_EQ(schema.index_size(), 1);
+}
+
+// Test case 2: Cached dictionary invalid, triggers refresh then succeeds.
+TEST(SchemaCloudDictionaryCacheTest,
ReplaceDictKeysToSchema_InvalidCache_ThenRefresh) {
+ int64_t index_id = 200;
+ FakeSchemaCloudDictionaryCache cache(10);
+ // Inject an invalid dictionary (missing required column key 101).
+ SchemaCloudDictionarySPtr invalid_dict =
+ FakeSchemaCloudDictionaryCache::createInvalidDictionary();
+ cache.TestInsert(index_id, invalid_dict);
+
+ // Create rowset meta with keys expecting valid dictionary.
+ RowsetMetaCloudPB rs_meta;
+ SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list();
+ dict_keys->add_column_dict_key_list(101); // invalid dict does not contain
101.
+ dict_keys->add_index_info_dict_key_list(201);
+ rs_meta.mutable_tablet_schema();
+
+ cache.simulate_refresh_success = true;
+ Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta);
+ EXPECT_TRUE(st.ok());
+
+ // After refresh, the valid dictionary should be used.
+ const TabletSchemaCloudPB& schema = rs_meta.tablet_schema();
+ EXPECT_EQ(schema.column_size(), 1);
+ EXPECT_EQ(schema.index_size(), 1);
+}
+
+// Test case 3: No dictionary in cache, refresh is triggered and succeeds.
+TEST(SchemaCloudDictionaryCacheTest,
ReplaceDictKeysToSchema_NoCache_ThenRefresh) {
+ int64_t index_id = 300;
+ FakeSchemaCloudDictionaryCache cache(10);
+ // Not injecting any dictionary so that _lookup returns null.
+ RowsetMetaCloudPB rs_meta;
+ SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list();
+ dict_keys->add_column_dict_key_list(101);
+ dict_keys->add_index_info_dict_key_list(201);
+ rs_meta.mutable_tablet_schema();
+
+ // Refresh should be triggered.
+ cache.simulate_refresh_success = true;
+ Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta);
+ EXPECT_TRUE(st.ok());
+
+ const TabletSchemaCloudPB& schema = rs_meta.tablet_schema();
+ EXPECT_EQ(schema.column_size(), 1);
+ EXPECT_EQ(schema.index_size(), 1);
+}
+
+// Test case 4: Refresh fails, replace_dict_keys_to_schema returns error.
+TEST(SchemaCloudDictionaryCacheTest, ReplaceDictKeysToSchema_RefreshFailure) {
+ int64_t index_id = 400;
+ FakeSchemaCloudDictionaryCache cache(10);
+ // Ensure no valid dictionary in cache.
+ RowsetMetaCloudPB rs_meta;
+ SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list();
+ dict_keys->add_column_dict_key_list(101);
+ dict_keys->add_index_info_dict_key_list(201);
+ rs_meta.mutable_tablet_schema();
+
+ cache.simulate_refresh_success = false;
+ Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta);
+ EXPECT_FALSE(st.ok());
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 6385fc7c9e8..fe887760f7f 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -85,12 +85,12 @@ BvarLatencyRecorderWithTag
g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job"
BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms",
"get_cluster_status");
BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms",
"set_cluster_status");
BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv");
+BvarLatencyRecorderWithTag g_bvar_ms_get_schema_dict("ms", "get_schema_dict");
bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
bvar::Window<bvar::Adder<int64_t> >
g_bvar_update_delete_bitmap_fail_counter_minute("ms",
"update_delete_bitmap_fail", &g_bvar_update_delete_bitmap_fail_counter, 60);
bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
bvar::Window<bvar::Adder<int64_t> >
g_bvar_get_delete_bitmap_fail_counter_minute("ms", "get_delete_bitmap_fail",
&g_bvar_get_delete_bitmap_fail_counter, 60);
-
// recycler's bvars
// TODO: use mbvar for per instance,
https://github.com/apache/brpc/blob/master/docs/cn/mbvar_c++.md
BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_index_earlest_ts("recycler",
"recycle_index_earlest_ts");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index ff1d3520b30..7f616615394 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -160,6 +160,7 @@ extern BvarLatencyRecorderWithTag
g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
+extern BvarLatencyRecorderWithTag g_bvar_ms_get_schema_dict;
extern bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
extern bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index bc8af94496a..ebe46e4a438 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1691,7 +1691,7 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
}
}
- if (need_read_schema_dict) {
+ if (need_read_schema_dict && request->schema_op() !=
GetRowsetRequest::NO_DICT) {
read_schema_dict(code, msg, instance_id, idx.index_id(), txn.get(),
response->mutable_rowset_meta(),
response->mutable_schema_dict(),
request->schema_op());
@@ -2589,4 +2589,57 @@ std::size_t get_segments_key_bounds_bytes(const
doris::RowsetMetaCloudPB& rowset
return ret;
}
+void MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController*
controller,
+ const GetSchemaDictRequest* request,
+ GetSchemaDictResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(get_schema_dict);
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(WARNING) << msg << ", cloud_unique_id=" <<
request->cloud_unique_id();
+ return;
+ }
+
+ if (!request->has_index_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "missing index_id in request";
+ return;
+ }
+
+ RPC_RATE_LIMIT(get_schema_dict)
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ std::string dict_key = meta_schema_pb_dictionary_key({instance_id,
request->index_id()});
+ ValueBuf dict_val;
+ err = cloud::get(txn.get(), dict_key, &dict_val);
+ LOG(INFO) << "Retrieved column pb dictionary, index_id=" <<
request->index_id()
+ << " key=" << hex(dict_key) << " error=" << err;
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK)
{
+ // Handle retrieval error.
+ ss << "Failed to retrieve column pb dictionary, instance_id=" <<
instance_id
+ << " table_id=" << request->index_id() << " key=" << hex(dict_key)
<< " error=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ return;
+ }
+ SchemaCloudDictionary schema_dict;
+ if (err == TxnErrorCode::TXN_OK && !dict_val.to_pb(&schema_dict)) {
+ // Handle parse error.
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("Malformed tablet dictionary value, key={}",
hex(dict_key));
+ return;
+ }
+
+ response->mutable_schema_dict()->Swap(&schema_dict);
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index 5374cbea741..6df09bd2c20 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -301,6 +301,10 @@ public:
void get_txn_id(::google::protobuf::RpcController* controller, const
GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure*
done) override;
+ void get_schema_dict(::google::protobuf::RpcController* controller,
+ const GetSchemaDictRequest* request,
GetSchemaDictResponse* response,
+ ::google::protobuf::Closure* done) override;
+
// ATTN: If you add a new method, please also add the corresponding
implementation in `MetaServiceProxy`.
std::pair<MetaServiceCode, std::string> get_instance_info(const
std::string& instance_id,
@@ -694,6 +698,12 @@ public:
call_impl(&cloud::MetaService::get_txn_id, controller, request,
response, done);
}
+ void get_schema_dict(::google::protobuf::RpcController* controller,
+ const GetSchemaDictRequest* request,
GetSchemaDictResponse* response,
+ ::google::protobuf::Closure* done) override {
+ call_impl(&cloud::MetaService::get_schema_dict, controller, request,
response, done);
+ }
+
private:
template <typename Request, typename Response>
using MetaServiceMethod = void
(cloud::MetaService::*)(::google::protobuf::RpcController*,
diff --git a/cloud/src/meta-service/meta_service_schema.cpp
b/cloud/src/meta-service/meta_service_schema.cpp
index ff88e82cf20..bbbedd1e3ab 100644
--- a/cloud/src/meta-service/meta_service_schema.cpp
+++ b/cloud/src/meta-service/meta_service_schema.cpp
@@ -212,6 +212,10 @@ void process_dictionary(SchemaCloudDictionary& dict,
// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
void write_schema_dict(MetaServiceCode& code, std::string& msg, const
std::string& instance_id,
Transaction* txn, RowsetMetaCloudPB* rowset_meta) {
+ // if schema_dict_key_list is not empty, then the schema already replaced
in BE side, and no need to update dict
+ if (rowset_meta->has_schema_dict_key_list()) {
+ return;
+ }
std::stringstream ss;
// wrtie dict to rowset meta and update dict
SchemaCloudDictionary dict;
@@ -311,6 +315,7 @@ void write_schema_dict(MetaServiceCode& code, std::string&
msg, const std::strin
cloud::put(txn, dict_key, dict_val, 0);
LOG(INFO) << "Dictionary saved, key=" << hex(dict_key)
<< " txn_id=" << rowset_meta->txn_id() << " Dict size=" <<
dict.column_dict_size()
+ << ", index_id=" << rowset_meta->index_id()
<< ", Current column ID=" << dict.current_column_dict_id()
<< ", Current index ID=" << dict.current_index_dict_id()
<< ", Dict bytes=" << dict_val.size();
@@ -322,7 +327,6 @@ void read_schema_dict(MetaServiceCode& code, std::string&
msg, const std::string
google::protobuf::RepeatedPtrField<doris::RowsetMetaCloudPB>* rsp_metas,
SchemaCloudDictionary* rsp_dict,
GetRowsetRequest::SchemaOp schema_op) {
std::stringstream ss;
-
// read dict if any rowset has dict key list
SchemaCloudDictionary dict;
std::string column_dict_key = meta_schema_pb_dictionary_key({instance_id,
index_id});
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 5d541547b97..4b9ad50d5d5 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -995,6 +995,16 @@ message GetRowsetResponse {
optional SchemaCloudDictionary schema_dict = 4;
}
+message GetSchemaDictRequest {
+ optional string cloud_unique_id = 1; // For auth
+ optional int64 index_id = 2;
+}
+
+message GetSchemaDictResponse {
+ optional MetaServiceResponseStatus status = 1;
+ optional SchemaCloudDictionary schema_dict = 2;
+}
+
message IndexRequest {
optional string cloud_unique_id = 1; // For auth
repeated int64 index_ids = 2;
@@ -1390,6 +1400,8 @@ enum MetaServiceCode {
// The meta service retries KV_TXN_CONFLICT error but is exceeded the max
times.
KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES = 10001;
+ SCHEMA_DICT_NOT_FOUND = 11001;
+
UNDEFINED_ERR = 1000000;
}
@@ -1554,6 +1566,7 @@ service MetaService {
rpc commit_rowset(CreateRowsetRequest) returns (CreateRowsetResponse);
rpc update_tmp_rowset(CreateRowsetRequest) returns (CreateRowsetResponse);
rpc get_rowset(GetRowsetRequest) returns (GetRowsetResponse);
+ rpc get_schema_dict(GetSchemaDictRequest) returns (GetSchemaDictResponse);
rpc prepare_index(IndexRequest) returns (IndexResponse);
rpc commit_index(IndexRequest) returns (IndexResponse);
rpc drop_index(IndexRequest) returns (IndexResponse);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]