This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 01b9e620034 [improvement](file cache) Try to read from remote storage
when opening segment with CachedRemoteFileReader (#38645)
01b9e620034 is described below
commit 01b9e62003408b10d01cd285616dd7c490e3c87c
Author: Gavin Chou <[email protected]>
AuthorDate: Tue Aug 6 12:08:08 2024 +0800
[improvement](file cache) Try to read from remote storage when opening
segment with CachedRemoteFileReader (#38645)
We may encounter the error "Bad segment" file in some rare cases where
the file cache may not hold the correct segment files. We should read
the remote original segment files to increase robustness.
---
be/src/cloud/injection_point_action.cpp | 19 +++++++-
be/src/olap/rowset/segment_v2/segment.cpp | 75 ++++++++++++++++++++++++++-----
2 files changed, 81 insertions(+), 13 deletions(-)
diff --git a/be/src/cloud/injection_point_action.cpp
b/be/src/cloud/injection_point_action.cpp
index d5a13238837..be90ee23afd 100644
--- a/be/src/cloud/injection_point_action.cpp
+++ b/be/src/cloud/injection_point_action.cpp
@@ -108,6 +108,22 @@ void register_suites() {
sp->set_call_back("VOlapTableSink::close",
[](auto&&) {
std::this_thread::sleep_for(std::chrono::seconds(5)); });
});
+ suite_map.emplace("test_file_segment_cache_corruption", [] {
+ auto* sp = SyncPoint::get_instance();
+ sp->set_call_back("Segment::open:corruption", [](auto&& args) {
+ LOG(INFO) << "injection Segment::open:corruption";
+ auto* arg0 = try_any_cast<Status*>(args[0]);
+ *arg0 =
Status::Corruption<false>("test_file_segment_cache_corruption injection error");
+ });
+ });
+ suite_map.emplace("test_file_segment_cache_corruption1", [] {
+ auto* sp = SyncPoint::get_instance();
+ sp->set_call_back("Segment::open:corruption1", [](auto&& args) {
+ LOG(INFO) << "injection Segment::open:corruption1";
+ auto* arg0 = try_any_cast<Status*>(args[0]);
+ *arg0 =
Status::Corruption<false>("test_file_segment_cache_corruption injection error");
+ });
+ });
}
void set_sleep(const std::string& point, HttpRequest* req) {
@@ -215,6 +231,7 @@ void handle_set(HttpRequest* req) {
void handle_clear(HttpRequest* req) {
const auto& point = req->param("name");
auto* sp = SyncPoint::get_instance();
+ LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)"
: point);
if (point.empty()) {
// If point name is emtpy, clear all
sp->clear_all_call_backs();
@@ -257,7 +274,7 @@ void handle_disable(HttpRequest* req) {
InjectionPointAction::InjectionPointAction() = default;
void InjectionPointAction::handle(HttpRequest* req) {
- LOG(INFO) << req->debug_string();
+ LOG(INFO) << "handle InjectionPointAction " << req->debug_string();
auto& op = req->param("op");
if (op == "set") {
handle_set(req);
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 8d862528de6..1b31117f126 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -28,7 +28,9 @@
#include "common/logging.h"
#include "common/status.h"
+#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/io_common.h"
@@ -77,6 +79,15 @@ namespace doris::segment_v2 {
static bvar::Adder<size_t> g_total_segment_num("doris_total_segment_num");
class InvertedIndexIterator;
+io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
+ std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky:
npos + 1 == 0
+ return io::BlockFileCache::hash(base);
+}
+
+std::string file_cache_key_str(const std::string& seg_path) {
+ return file_cache_key_from_path(seg_path).to_string();
+}
+
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t
segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options,
@@ -84,9 +95,43 @@ Status Segment::open(io::FileSystemSPtr fs, const
std::string& path, uint32_t se
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
std::shared_ptr<Segment> segment(new Segment(segment_id, rowset_id,
std::move(tablet_schema)));
- segment->_fs = std::move(fs);
+ segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
- RETURN_IF_ERROR(segment->_open());
+ auto st = segment->_open();
+ TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
+ if (st.is<ErrorCode::CORRUPTION>() &&
+ reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
+ LOG(WARNING) << "bad segment file may be read from file cache, try to
read remote source "
+ "file directly, file path: "
+ << path << " cache_key: " << file_cache_key_str(path);
+ auto file_key = file_cache_key_from_path(path);
+ auto* file_cache =
io::FileCacheFactory::instance()->get_by_path(file_key);
+ file_cache->remove_if_cached(file_key);
+
+ RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
+ segment->_file_reader = std::move(file_reader);
+ st = segment->_open();
+ TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
+ if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
+ LOG(WARNING) << "failed to try to read remote source file again
with cache support,"
+ << " try to read from remote directly, "
+ << " file path: " << path << " cache_key: " <<
file_cache_key_str(path);
+ file_cache =
io::FileCacheFactory::instance()->get_by_path(file_key);
+ file_cache->remove_if_cached(file_key);
+
+ io::FileReaderOptions opt = reader_options;
+ opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache
+ RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt));
+ segment->_file_reader = std::move(file_reader);
+ st = segment->_open();
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to try to read remote source file
directly,"
+ << " file path: " << path
+ << " cache_key: " << file_cache_key_str(path);
+ }
+ }
+ }
+ RETURN_IF_ERROR(st);
*output = std::move(segment);
return Status::OK();
}
@@ -256,8 +301,9 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4),
MagicNumber(4)
auto file_size = _file_reader->size();
if (file_size < 12) {
- return Status::Corruption("Bad segment file {}: file size {} < 12",
- _file_reader->path().native(), file_size);
+ return Status::Corruption("Bad segment file {}: file size {} < 12,
cache_key: {}",
+ _file_reader->path().native(), file_size,
+
file_cache_key_str(_file_reader->path().native()));
}
uint8_t fixed_buf[12];
@@ -269,15 +315,17 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
DCHECK_EQ(bytes_read, 12);
if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) {
- return Status::Corruption("Bad segment file {}: magic number not
match",
- _file_reader->path().native());
+ return Status::Corruption("Bad segment file {}: magic number not
match, cache_key: {}",
+ _file_reader->path().native(),
+
file_cache_key_str(_file_reader->path().native()));
}
// read footer PB
uint32_t footer_length = decode_fixed32_le(fixed_buf);
if (file_size < 12 + footer_length) {
- return Status::Corruption("Bad segment file {}: file size {} < {}",
- _file_reader->path().native(), file_size, 12
+ footer_length);
+ return Status::Corruption("Bad segment file {}: file size {} < {},
cache_key: {}",
+ _file_reader->path().native(), file_size, 12
+ footer_length,
+
file_cache_key_str(_file_reader->path().native()));
}
std::string footer_buf;
@@ -291,14 +339,17 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
uint32_t actual_checksum = crc32c::Value(footer_buf.data(),
footer_buf.size());
if (actual_checksum != expect_checksum) {
return Status::Corruption(
- "Bad segment file {}: footer checksum not match, actual={} vs
expect={}",
- _file_reader->path().native(), actual_checksum,
expect_checksum);
+ "Bad segment file {}: footer checksum not match, actual={} vs
expect={}, "
+ "cache_key: {}",
+ _file_reader->path().native(), actual_checksum,
expect_checksum,
+ file_cache_key_str(_file_reader->path().native()));
}
// deserialize footer PB
if (!footer->ParseFromString(footer_buf)) {
- return Status::Corruption("Bad segment file {}: failed to parse
SegmentFooterPB",
- _file_reader->path().native());
+ return Status::Corruption(
+ "Bad segment file {}: failed to parse SegmentFooterPB,
cache_key: ",
+ _file_reader->path().native(),
file_cache_key_str(_file_reader->path().native()));
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]