freemandealer commented on code in PR #49456:
URL: https://github.com/apache/doris/pull/49456#discussion_r2162911732
##########
be/src/io/cache/block_file_cache.cpp:
##########
@@ -2097,6 +2160,367 @@ void BlockFileCache::update_ttl_atime(const
UInt128Wrapper& hash) {
};
}
+BlockFileCache::CacheLRULogQueue&
BlockFileCache::get_lru_log_queue(FileCacheType type) {
+ switch (type) {
+ case FileCacheType::INDEX:
+ return _index_lru_log_queue;
+ case FileCacheType::DISPOSABLE:
+ return _disposable_lru_log_queue;
+ case FileCacheType::NORMAL:
+ return _normal_lru_log_queue;
+ case FileCacheType::TTL:
+ return _ttl_lru_log_queue;
+ default:
+ DCHECK(false);
+ }
+ return _normal_lru_log_queue;
+}
+
+void BlockFileCache::record_queue_event(CacheLRULogQueue& log_queue,
CacheLRULogType log_type,
+ const UInt128Wrapper hash, const
size_t offset,
+ const size_t size) {
+ log_queue.push_back(std::make_unique<CacheLRULog>(log_type, hash, offset,
size));
+}
+
+void BlockFileCache::replay_queue_event(CacheLRULogQueue& log_queue, LRUQueue&
shadow_queue) {
+ // we don't need the real cache lock for the shadow queue, but we do need
a lock to prevent read/write contension
+ std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
+ while (!log_queue.empty()) {
+ auto log = std::move(log_queue.front());
+ log_queue.pop_front();
+ try {
+ switch (log->type) {
+ case CacheLRULogType::ADD: {
+ shadow_queue.add(log->hash, log->offset, log->size,
lru_log_lock);
+ break;
+ }
+ case CacheLRULogType::REMOVE: {
+ auto it = shadow_queue.get(log->hash, log->offset,
lru_log_lock);
+ if (it != shadow_queue.end()) {
+ shadow_queue.remove(it, lru_log_lock);
+ } else {
+ LOG(WARNING) << "REMOVE failed, doesn't exist in shadow
queue";
+ }
+ break;
+ }
+ case CacheLRULogType::MOVETOBACK: {
+ auto it = shadow_queue.get(log->hash, log->offset,
lru_log_lock);
+ if (it != shadow_queue.end()) {
+ shadow_queue.move_to_end(it, lru_log_lock);
+ } else {
+ LOG(WARNING) << "MOVETOBACK failed, doesn't exist in
shadow queue";
+ }
+ break;
+ }
+ default:
+ LOG(WARNING) << "Unknown CacheLRULogType: " <<
static_cast<int>(log->type);
+ break;
+ }
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Failed to replay queue event: " << e.what();
+ }
+ }
+}
+
+void BlockFileCache::run_background_lru_log_replay() {
+ while (!_close) {
+ int64_t interval_ms =
config::file_cache_background_lru_log_replay_interval_ms;
+ {
+ std::unique_lock close_lock(_close_mtx);
+ _close_cv.wait_for(close_lock,
std::chrono::milliseconds(interval_ms));
+ if (_close) {
+ break;
+ }
+ }
+
+ replay_queue_event(_ttl_lru_log_queue, _shadow_ttl_queue);
+ replay_queue_event(_index_lru_log_queue, _shadow_index_queue);
+ replay_queue_event(_normal_lru_log_queue, _shadow_normal_queue);
+ replay_queue_event(_disposable_lru_log_queue,
_shadow_disposable_queue);
+
+ //TODO(zhengyu): add debug facilities to check diff between real and
shadow queue
+ }
+}
+
+Status BlockFileCache::check_ofstream_status(std::ofstream& out, std::string&
filename) {
+ if (!out.good()) {
+ std::ios::iostate state = out.rdstate();
+ std::stringstream err_msg;
+ if (state & std::ios::eofbit) {
+ err_msg << "End of file reached.";
+ }
+ if (state & std::ios::failbit) {
+ err_msg << "Input/output operation failed, err_code: " <<
strerror(errno);
+ }
+ if (state & std::ios::badbit) {
+ err_msg << "Serious I/O error occurred, err_code: " <<
strerror(errno);
+ }
+ out.close();
+ std::string warn_msg = fmt::format("dump lru writing failed, file={},
{}", filename,
+ err_msg.str().c_str());
+ LOG(WARNING) << warn_msg;
+ return Status::InternalError<false>(warn_msg);
+ }
+
+ return Status::OK();
+}
+
+Status BlockFileCache::dump_one_lru_entry(std::ofstream& out, std::string&
filename,
+ const UInt128Wrapper& hash, size_t
offset, size_t size) {
+#ifndef USE_PROTOBUF_DUMP
+ // the greatest thing when using raw dump is that it saves 18.75% dump size
+ // with version in footer, we can still do upgrade easily
+ out.write(reinterpret_cast<const char*>(&hash), sizeof(hash));
+ out.write(reinterpret_cast<const char*>(&offset), sizeof(offset));
+ out.write(reinterpret_cast<const char*>(&size), sizeof(size));
+#else
+ // why we are not using protobuf as a whole?
+ // AFAIK, current protobuf version dose not support streaming mode,
+ // so that we need to store all the message in memory which will
+ // consume loads of RAMs.
+ // Instead, we use protobuf serialize each of the single entry
+ // and provide the version field in the footer for upgrade
+ doris::io::cache::LruDumpEntryPB entry;
+ auto* hash_pb = entry.mutable_hash();
+ hash_pb->set_high(hash.high());
+ hash_pb->set_low(hash.low());
+ entry.set_offset(offset);
+ entry.set_size(size);
+
+ std::string serialized;
+ if (!entry.SerializeToString(&serialized)) {
+ std::string warn_msg = fmt::format("dump lru serialize failed,
file={}, hash={}, offset={}",
+ filename, hash.to_string(), offset);
+ LOG(WARNING) << warn_msg;
+ return Status::InternalError<false>(warn_msg);
+ }
+
+ out.write(serialized.data(), serialized.size());
+#endif
+ return check_ofstream_status(out, filename);
+}
+
+Status BlockFileCache::finalize_dump(std::ofstream& out, size_t entry_num,
+ std::string& tmp_filename, std::string&
final_filename,
+ size_t& file_size) {
+ // write footer: size_t entry_num, version, magic, totally 12 bytes
+ int8_t version = 1;
+ std::string magic_str = "DOR";
+ out.write(reinterpret_cast<const char*>(&entry_num), sizeof(entry_num));
+ out.write(reinterpret_cast<const char*>(&version), sizeof(version));
+ out.write(magic_str.c_str(), magic_str.size());
+ RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+ out.close();
+
+ // rename tmp to formal file
+ if (std::rename(tmp_filename.c_str(), final_filename.c_str()) != 0) {
+ std::remove(tmp_filename.c_str());
+ file_size = std::filesystem::file_size(final_filename);
+ }
+ return Status::OK();
+}
+
+void BlockFileCache::run_background_lru_dump() {
+ while (!_close) {
+ int64_t interval_ms =
config::file_cache_background_lru_dump_interval_ms;
+ {
+ std::unique_lock close_lock(_close_mtx);
+ _close_cv.wait_for(close_lock,
std::chrono::milliseconds(interval_ms));
+ if (_close) {
+ break;
+ }
+ }
+
+ auto dump_queue = [&](LRUQueue& queue, const std::string& queue_name) {
+ Status st;
+ std::vector<std::tuple<UInt128Wrapper, size_t, size_t>> elements;
+
elements.reserve(config::file_cache_background_lru_dump_tail_record_num);
+
+ {
+ std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
+ size_t count = 0;
+ for (const auto& [hash, offset, size] : queue) {
+ if (count++ >=
config::file_cache_background_lru_dump_tail_record_num) break;
+ elements.emplace_back(hash, offset, size);
+ }
+ }
+
+ // Write to disk
+ int64_t duration_ns = 0;
+ std::uintmax_t file_size = 0;
+ {
+ SCOPED_RAW_TIMER(&duration_ns);
Review Comment:
no, no header. but the footer will satisfy .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]