deardeng commented on code in PR #56384:
URL: https://github.com/apache/doris/pull/56384#discussion_r2478042717
##########
be/src/io/cache/cached_remote_file_reader.cpp:
##########
@@ -131,6 +150,124 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
return std::make_pair(align_left, align_size);
}
+namespace {
+std::optional<int64_t> extract_tablet_id(const std::string& file_path) {
+ return StorageResource::parse_tablet_id_from_path(file_path);
+}
+
+// Get peer connection info from tablet_id
+std::pair<std::string, int> get_peer_connection_info(const std::string&
file_path) {
+ std::string host = "";
+ int port = 0;
+
+ // Try to get tablet_id from actual path and lookup tablet info
+ if (auto tablet_id = extract_tablet_id(file_path)) {
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) {
+ host = tablet_info->first;
+ port = tablet_info->second;
+ } else {
+ LOG_WARNING("get peer connection info not found")
+ .tag("tablet_id", *tablet_id)
+ .tag("file_path", file_path);
+ }
+ } else {
+ LOG_WARNING("parse tablet id from path failed")
+ .tag("tablet_id", "null")
+ .tag("file_path", file_path);
+ }
+
+ DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
+ host = dp->param<std::string>("host", "127.0.0.1");
+ port = dp->param("port", 9060);
+ LOG_WARNING("debug point
PeerFileCacheReader::_fetch_from_peer_cache_blocks")
+ .tag("host", host)
+ .tag("port", port);
+ });
+
+ return {host, port};
+}
+
+// Execute peer read with fallback to S3
+Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
+ size_t& size, std::unique_ptr<char[]>& buffer,
+ const std::string& file_path, bool is_doris_table,
ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+
+ auto [host, port] = get_peer_connection_info(file_path);
+ VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ",
port=" << port
+ << ", file_path=" << file_path;
+
+ if (host.empty() || port == 0) {
+ LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is
empty"
+ << ", host=" << host << ", port=" << port
+ << ", file_path=" << file_path;
+ return Status::InternalError("host or port is empty");
+ }
+ peer_read_counter << 1;
+ PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
+ auto st = peer_reader.fetch_blocks(empty_blocks, empty_start,
Slice(buffer.get(), size), &size,
+ io_ctx);
+ if (!st.ok()) {
+ LOG_WARNING("PeerFileCacheReader read from peer failed")
+ .tag("host", host)
+ .tag("port", port)
+ .tag("error", st.msg());
+ }
+ return st;
+}
+
+// Execute S3 read
+Status execute_s3_read(size_t empty_start, size_t& size,
std::unique_ptr<char[]>& buffer,
+ ReadStatistics& stats, const IOContext* io_ctx,
+ FileReaderSPtr remote_file_reader) {
+ s3_read_counter << 1;
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+ return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size),
&size, io_ctx);
+}
+
+} // anonymous namespace
+
+Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockSPtr>& empty_blocks,
+ size_t empty_start,
size_t& size,
+ std::unique_ptr<char[]>&
buffer,
+ ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", {
+ // Determine read type from debug point or default to S3
+ std::string read_type = "s3";
+ read_type = dp->param<std::string>("type", "s3");
+ LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type")
+ .tag("path", path().native())
+ .tag("off", empty_start)
+ .tag("size", size)
+ .tag("type", read_type);
+ // Execute appropriate read strategy
+ if (read_type == "s3") {
+ return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
+ } else {
+ return execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
+ _is_doris_table, stats, io_ctx);
+ }
+ });
+
+ if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) {
+ return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
+ } else {
+ // first try peer read, if peer failed, fallback to S3
+ // peer timeout is 5 seconds
+ // TODO(dx): here peer and s3 reader need to get data in parallel, and
take the one that is correct and returns first
+ auto st = execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
Review Comment:
enable_cache_read_from_peer 没有peer的信息,会立即失败。
暂时没有其他辅助条件,后面再优化这个逻辑
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]