github-actions[bot] commented on code in PR #30089: URL: https://github.com/apache/doris/pull/30089#discussion_r1457069082
########## be/src/cloud/cloud_tablet_mgr.cpp: ########## @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { + std::lock_guard lock(_mtx); + _map[tablet->tablet_id()] = std::move(tablet); + } + + void erase(CloudTablet* tablet) { + std::lock_guard lock(_mtx); + auto it = _map.find(tablet->tablet_id()); + // According to the implementation of `LRUCache`, `deleter` may be called after a tablet + // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet + // instance to be erased is the same one in the map. + if (it != _map.end() && it->second.get() == tablet) { + _map.erase(it); + } + } + + std::shared_ptr<CloudTablet> get(int64_t tablet_id) { Review Comment: warning: method 'get' can be made static [readability-convert-member-functions-to-static] ```suggestion static std::shared_ptr<CloudTablet> get(int64_t tablet_id) { ``` ########## be/src/cloud/cloud_tablet_mgr.cpp: ########## @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { Review Comment: warning: method 'put' can be made static [readability-convert-member-functions-to-static] ```suggestion static void put(std::shared_ptr<CloudTablet> tablet) { ``` ########## be/src/cloud/cloud_tablet_mgr.cpp: ########## @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { + std::lock_guard lock(_mtx); + _map[tablet->tablet_id()] = std::move(tablet); + } + + void erase(CloudTablet* tablet) { Review Comment: warning: method 'erase' can be made static [readability-convert-member-functions-to-static] ```suggestion static void erase(CloudTablet* tablet) { ``` ########## be/src/cloud/cloud_tablet_mgr.cpp: ########## @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { + std::lock_guard lock(_mtx); + _map[tablet->tablet_id()] = std::move(tablet); + } + + void erase(CloudTablet* tablet) { + std::lock_guard lock(_mtx); + auto it = _map.find(tablet->tablet_id()); + // According to the implementation of `LRUCache`, `deleter` may be called after a tablet + // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet + // instance to be erased is the same one in the map. + if (it != _map.end() && it->second.get() == tablet) { + _map.erase(it); + } + } + + std::shared_ptr<CloudTablet> get(int64_t tablet_id) { + std::lock_guard lock(_mtx); + if (auto it = _map.find(tablet_id); it != _map.end()) { + return it->second; + } + return nullptr; + } + + size_t size() { return _map.size(); } + + void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) { + std::lock_guard lock(_mtx); + for (auto& [_, tablet] : _map) { + visitor(tablet); + } + } + +private: + std::mutex _mtx; + std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map; +}; + +// TODO(plat1ko): Prune cache +CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) + : _engine(engine), + _tablet_map(std::make_unique<TabletMap>()), + _cache(std::make_unique<LRUCachePolicy>( + CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, + LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {} + +CloudTabletMgr::~CloudTabletMgr() = default; + +Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, + bool warmup_data) { + // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` + struct Value { + // FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value` + // only requires a reference. + std::shared_ptr<CloudTablet> tablet; + TabletMap& tablet_map; + }; + + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str); + auto* cache = _cache->cache(); + auto* handle = cache->lookup(key); + if (handle == nullptr) { + auto load_tablet = [this, cache, &key, + warmup_data](int64_t tablet_id) -> std::shared_ptr<CloudTablet> { + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); + if (!st.ok()) { + LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta)); + auto value = std::make_unique<Value>(Value { + .tablet = tablet, + .tablet_map = *_tablet_map, + }); + // MUST sync stats to let compaction scheduler work correctly + st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data); + if (!st.ok()) { + LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto deleter = [](const CacheKey& key, void* value) { + auto* value1 = reinterpret_cast<Value*>(value); + // tablet has been evicted, release it from `tablet_map` + value1->tablet_map.erase(value1->tablet.get()); + delete value1; + }; + + auto* handle = cache->insert(key, value.release(), 1, deleter); + auto ret = std::shared_ptr<CloudTablet>( + tablet.get(), [cache, handle](...) { cache->release(handle); }); + _tablet_map->put(std::move(tablet)); + return ret; + }; + + auto tablet = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet)); + if (tablet == nullptr) { + return ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); + } + return tablet; + } + + CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(cache->value(handle))->tablet.get(); + auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, + [cache, handle](...) { cache->release(handle); }); + return tablet; +} + +void CloudTabletMgr::erase_tablet(int64_t tablet_id) { + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str.data(), tablet_id_str.size()); + _cache->cache()->erase(key); +} + +void CloudTabletMgr::vacuum_stale_rowsets() { + LOG_INFO("begin to vacuum stale rowsets"); + std::vector<std::shared_ptr<CloudTablet>> tablets_to_vacuum; + tablets_to_vacuum.reserve(_tablet_map->size()); + _tablet_map->traverse([&tablets_to_vacuum](auto&& t) { + if (t->has_stale_rowsets()) { + tablets_to_vacuum.push_back(t); + } + }); + int num_vacuumed = 0; + for (auto& t : tablets_to_vacuum) { + num_vacuumed += t->delete_expired_stale_rowsets(); + } + LOG_INFO("finish vacuum stale rowsets").tag("num_vacuumed", num_vacuumed); +} + +std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() { + std::vector<std::weak_ptr<CloudTablet>> weak_tablets; + weak_tablets.reserve(_tablet_map->size()); + _tablet_map->traverse([&weak_tablets](auto& t) { weak_tablets.push_back(t); }); + return weak_tablets; +} + +void CloudTabletMgr::sync_tablets() { Review Comment: warning: method 'sync_tablets' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_tablet_mgr.h:49: ```diff - void sync_tablets(); + static void sync_tablets(); ``` ########## be/src/cloud/cloud_tablet_mgr.cpp: ########## @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { + std::lock_guard lock(_mtx); + _map[tablet->tablet_id()] = std::move(tablet); + } + + void erase(CloudTablet* tablet) { + std::lock_guard lock(_mtx); + auto it = _map.find(tablet->tablet_id()); + // According to the implementation of `LRUCache`, `deleter` may be called after a tablet + // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet + // instance to be erased is the same one in the map. + if (it != _map.end() && it->second.get() == tablet) { + _map.erase(it); + } + } + + std::shared_ptr<CloudTablet> get(int64_t tablet_id) { + std::lock_guard lock(_mtx); + if (auto it = _map.find(tablet_id); it != _map.end()) { + return it->second; + } + return nullptr; + } + + size_t size() { return _map.size(); } + + void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) { Review Comment: warning: method 'traverse' can be made static [readability-convert-member-functions-to-static] ```suggestion static void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) { ``` ########## be/src/cloud/cloud_tablet_mgr.cpp: ########## @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { + std::lock_guard lock(_mtx); + _map[tablet->tablet_id()] = std::move(tablet); + } + + void erase(CloudTablet* tablet) { + std::lock_guard lock(_mtx); + auto it = _map.find(tablet->tablet_id()); + // According to the implementation of `LRUCache`, `deleter` may be called after a tablet + // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet + // instance to be erased is the same one in the map. + if (it != _map.end() && it->second.get() == tablet) { + _map.erase(it); + } + } + + std::shared_ptr<CloudTablet> get(int64_t tablet_id) { + std::lock_guard lock(_mtx); + if (auto it = _map.find(tablet_id); it != _map.end()) { + return it->second; + } + return nullptr; + } + + size_t size() { return _map.size(); } + + void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) { + std::lock_guard lock(_mtx); + for (auto& [_, tablet] : _map) { + visitor(tablet); + } + } + +private: + std::mutex _mtx; + std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map; +}; + +// TODO(plat1ko): Prune cache +CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) + : _engine(engine), + _tablet_map(std::make_unique<TabletMap>()), + _cache(std::make_unique<LRUCachePolicy>( + CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, + LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {} + +CloudTabletMgr::~CloudTabletMgr() = default; + +Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, + bool warmup_data) { + // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` + struct Value { + // FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value` + // only requires a reference. + std::shared_ptr<CloudTablet> tablet; + TabletMap& tablet_map; + }; + + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str); + auto* cache = _cache->cache(); + auto* handle = cache->lookup(key); + if (handle == nullptr) { + auto load_tablet = [this, cache, &key, + warmup_data](int64_t tablet_id) -> std::shared_ptr<CloudTablet> { + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); + if (!st.ok()) { + LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta)); + auto value = std::make_unique<Value>(Value { + .tablet = tablet, + .tablet_map = *_tablet_map, + }); + // MUST sync stats to let compaction scheduler work correctly + st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data); + if (!st.ok()) { + LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto deleter = [](const CacheKey& key, void* value) { + auto* value1 = reinterpret_cast<Value*>(value); + // tablet has been evicted, release it from `tablet_map` + value1->tablet_map.erase(value1->tablet.get()); + delete value1; + }; + + auto* handle = cache->insert(key, value.release(), 1, deleter); + auto ret = std::shared_ptr<CloudTablet>( + tablet.get(), [cache, handle](...) { cache->release(handle); }); + _tablet_map->put(std::move(tablet)); + return ret; + }; + + auto tablet = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet)); + if (tablet == nullptr) { + return ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); + } + return tablet; + } + + CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(cache->value(handle))->tablet.get(); + auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, + [cache, handle](...) { cache->release(handle); }); + return tablet; +} + +void CloudTabletMgr::erase_tablet(int64_t tablet_id) { Review Comment: warning: method 'erase_tablet' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_tablet_mgr.h:41: ```diff - void erase_tablet(int64_t tablet_id); + static void erase_tablet(int64_t tablet_id); ``` ########## be/src/cloud/cloud_tablet_mgr.cpp: ########## @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { + std::lock_guard lock(_mtx); + _map[tablet->tablet_id()] = std::move(tablet); + } + + void erase(CloudTablet* tablet) { + std::lock_guard lock(_mtx); + auto it = _map.find(tablet->tablet_id()); + // According to the implementation of `LRUCache`, `deleter` may be called after a tablet + // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet + // instance to be erased is the same one in the map. + if (it != _map.end() && it->second.get() == tablet) { + _map.erase(it); + } + } + + std::shared_ptr<CloudTablet> get(int64_t tablet_id) { + std::lock_guard lock(_mtx); + if (auto it = _map.find(tablet_id); it != _map.end()) { + return it->second; + } + return nullptr; + } + + size_t size() { return _map.size(); } + + void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) { + std::lock_guard lock(_mtx); + for (auto& [_, tablet] : _map) { + visitor(tablet); + } + } + +private: + std::mutex _mtx; + std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map; +}; + +// TODO(plat1ko): Prune cache +CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) + : _engine(engine), + _tablet_map(std::make_unique<TabletMap>()), + _cache(std::make_unique<LRUCachePolicy>( + CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, + LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {} + +CloudTabletMgr::~CloudTabletMgr() = default; + +Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, + bool warmup_data) { + // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` + struct Value { + // FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value` + // only requires a reference. + std::shared_ptr<CloudTablet> tablet; + TabletMap& tablet_map; + }; + + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str); + auto* cache = _cache->cache(); + auto* handle = cache->lookup(key); + if (handle == nullptr) { + auto load_tablet = [this, cache, &key, + warmup_data](int64_t tablet_id) -> std::shared_ptr<CloudTablet> { + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); + if (!st.ok()) { + LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta)); + auto value = std::make_unique<Value>(Value { + .tablet = tablet, + .tablet_map = *_tablet_map, + }); + // MUST sync stats to let compaction scheduler work correctly + st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data); + if (!st.ok()) { + LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto deleter = [](const CacheKey& key, void* value) { + auto* value1 = reinterpret_cast<Value*>(value); + // tablet has been evicted, release it from `tablet_map` + value1->tablet_map.erase(value1->tablet.get()); + delete value1; + }; + + auto* handle = cache->insert(key, value.release(), 1, deleter); + auto ret = std::shared_ptr<CloudTablet>( + tablet.get(), [cache, handle](...) { cache->release(handle); }); + _tablet_map->put(std::move(tablet)); + return ret; + }; + + auto tablet = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet)); + if (tablet == nullptr) { + return ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); + } + return tablet; + } + + CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(cache->value(handle))->tablet.get(); + auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, + [cache, handle](...) { cache->release(handle); }); + return tablet; +} + +void CloudTabletMgr::erase_tablet(int64_t tablet_id) { + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str.data(), tablet_id_str.size()); + _cache->cache()->erase(key); +} + +void CloudTabletMgr::vacuum_stale_rowsets() { + LOG_INFO("begin to vacuum stale rowsets"); + std::vector<std::shared_ptr<CloudTablet>> tablets_to_vacuum; + tablets_to_vacuum.reserve(_tablet_map->size()); + _tablet_map->traverse([&tablets_to_vacuum](auto&& t) { + if (t->has_stale_rowsets()) { + tablets_to_vacuum.push_back(t); + } + }); + int num_vacuumed = 0; + for (auto& t : tablets_to_vacuum) { + num_vacuumed += t->delete_expired_stale_rowsets(); + } + LOG_INFO("finish vacuum stale rowsets").tag("num_vacuumed", num_vacuumed); +} + +std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() { + std::vector<std::weak_ptr<CloudTablet>> weak_tablets; + weak_tablets.reserve(_tablet_map->size()); + _tablet_map->traverse([&weak_tablets](auto& t) { weak_tablets.push_back(t); }); + return weak_tablets; +} + +void CloudTabletMgr::sync_tablets() { + LOG_INFO("begin to sync tablets"); + int64_t last_sync_time_bound = ::time(nullptr) - config::tablet_sync_interval_seconds; + + auto weak_tablets = get_weak_tablets(); + + // sort by last_sync_time + static auto cmp = [](const auto& a, const auto& b) { return a.first < b.first; }; + std::multiset<std::pair<int64_t, std::weak_ptr<CloudTablet>>, decltype(cmp)> + sync_time_tablet_set(cmp); + + for (auto& weak_tablet : weak_tablets) { + if (auto tablet = weak_tablet.lock()) { + if (tablet->tablet_state() != TABLET_RUNNING) { + continue; + } + int64_t last_sync_time = tablet->last_sync_time_s; + if (last_sync_time <= last_sync_time_bound) { + sync_time_tablet_set.emplace(last_sync_time, weak_tablet); + } + } + } + + int num_sync = 0; + for (auto&& [_, weak_tablet] : sync_time_tablet_set) { + if (auto tablet = weak_tablet.lock()) { + if (tablet->last_sync_time_s > last_sync_time_bound) { + continue; + } + + ++num_sync; + auto st = tablet->sync_meta(); + if (!st) { + LOG_WARNING("failed to sync tablet meta {}", tablet->tablet_id()).error(st); + if (st.is<ErrorCode::NOT_FOUND>()) { + continue; + } + } + + st = tablet->sync_rowsets(-1); + if (!st) { + LOG_WARNING("failed to sync tablet rowsets {}", tablet->tablet_id()).error(st); + } + } + } + LOG_INFO("finish sync tablets").tag("num_sync", num_sync); +} + +Status CloudTabletMgr::get_topn_tablets_to_compact( Review Comment: warning: method 'get_topn_tablets_to_compact' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status CloudTabletMgr::get_topn_tablets_to_compact( ``` ########## be/src/cloud/cloud_tablet_mgr.cpp: ########## @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { + std::lock_guard lock(_mtx); + _map[tablet->tablet_id()] = std::move(tablet); + } + + void erase(CloudTablet* tablet) { + std::lock_guard lock(_mtx); + auto it = _map.find(tablet->tablet_id()); + // According to the implementation of `LRUCache`, `deleter` may be called after a tablet + // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet + // instance to be erased is the same one in the map. + if (it != _map.end() && it->second.get() == tablet) { + _map.erase(it); + } + } + + std::shared_ptr<CloudTablet> get(int64_t tablet_id) { + std::lock_guard lock(_mtx); + if (auto it = _map.find(tablet_id); it != _map.end()) { + return it->second; + } + return nullptr; + } + + size_t size() { return _map.size(); } + + void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) { + std::lock_guard lock(_mtx); + for (auto& [_, tablet] : _map) { + visitor(tablet); + } + } + +private: + std::mutex _mtx; + std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map; +}; + +// TODO(plat1ko): Prune cache +CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) + : _engine(engine), + _tablet_map(std::make_unique<TabletMap>()), + _cache(std::make_unique<LRUCachePolicy>( + CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, + LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {} + +CloudTabletMgr::~CloudTabletMgr() = default; + +Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, + bool warmup_data) { + // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` + struct Value { + // FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value` + // only requires a reference. + std::shared_ptr<CloudTablet> tablet; + TabletMap& tablet_map; + }; + + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str); + auto* cache = _cache->cache(); + auto* handle = cache->lookup(key); + if (handle == nullptr) { + auto load_tablet = [this, cache, &key, + warmup_data](int64_t tablet_id) -> std::shared_ptr<CloudTablet> { + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); + if (!st.ok()) { + LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta)); + auto value = std::make_unique<Value>(Value { + .tablet = tablet, + .tablet_map = *_tablet_map, + }); + // MUST sync stats to let compaction scheduler work correctly + st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data); + if (!st.ok()) { + LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto deleter = [](const CacheKey& key, void* value) { + auto* value1 = reinterpret_cast<Value*>(value); + // tablet has been evicted, release it from `tablet_map` + value1->tablet_map.erase(value1->tablet.get()); + delete value1; + }; + + auto* handle = cache->insert(key, value.release(), 1, deleter); + auto ret = std::shared_ptr<CloudTablet>( + tablet.get(), [cache, handle](...) { cache->release(handle); }); + _tablet_map->put(std::move(tablet)); + return ret; + }; + + auto tablet = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet)); + if (tablet == nullptr) { + return ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); + } + return tablet; + } + + CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(cache->value(handle))->tablet.get(); + auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, + [cache, handle](...) { cache->release(handle); }); + return tablet; +} + +void CloudTabletMgr::erase_tablet(int64_t tablet_id) { + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str.data(), tablet_id_str.size()); + _cache->cache()->erase(key); +} + +void CloudTabletMgr::vacuum_stale_rowsets() { Review Comment: warning: method 'vacuum_stale_rowsets' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_tablet_mgr.h:43: ```diff - void vacuum_stale_rowsets(); + static void vacuum_stale_rowsets(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
