This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 57729bad68 [Enhancement](multi-catalog) Add hdfs read statistics
profile. (#21442)
57729bad68 is described below
commit 57729bad6841ea9728e6b2cf0bd484133e7b9ead
Author: Qi Chen <[email protected]>
AuthorDate: Fri Jul 7 14:52:14 2023 +0800
[Enhancement](multi-catalog) Add hdfs read statistics profile. (#21442)
Add hdfs read statistics profile.
```
- HdfsIO: 0ns
- TotalBytesRead: 133.47 MB
- TotalLocalBytesRead: 133.47 MB
- TotalShortCircuitBytesRead: 133.47 MB
- TotalZeroCopyBytesRead: 0.00
```
---
be/src/io/file_factory.cpp | 9 ++++---
be/src/io/file_factory.h | 3 ++-
be/src/io/fs/benchmark/hdfs_benchmark.hpp | 10 ++++----
be/src/io/fs/hdfs_file_reader.cpp | 40 ++++++++++++++++++++++++++++--
be/src/io/fs/hdfs_file_reader.h | 16 +++++++++++-
be/src/io/fs/hdfs_file_system.cpp | 12 +++++----
be/src/io/fs/hdfs_file_system.h | 7 ++++--
be/src/runtime/snapshot_loader.cpp | 2 +-
be/src/vec/runtime/vfile_result_writer.cpp | 2 +-
9 files changed, 79 insertions(+), 22 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index af32c81982..686c9a1b33 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type,
ExecEnv* env,
case TFileType::FILE_HDFS: {
THdfsParams hdfs_params = parse_properties(properties);
std::shared_ptr<io::HdfsFileSystem> fs;
- RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr,
&fs));
RETURN_IF_ERROR(fs->create_file(path, &file_writer));
break;
}
@@ -128,7 +128,7 @@ Status FileFactory::create_file_reader(const
FileSystemProperties& system_proper
}
case TFileType::FILE_HDFS: {
RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params,
file_description.path,
- file_system, file_reader,
reader_options));
+ file_system, file_reader,
reader_options, profile));
break;
}
case TFileType::FILE_BROKER: {
@@ -168,9 +168,10 @@ Status FileFactory::create_pipe_reader(const TUniqueId&
load_id, io::FileReaderS
Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const
std::string& path,
std::shared_ptr<io::FileSystem>*
hdfs_file_system,
io::FileReaderSPtr* reader,
- const io::FileReaderOptions&
reader_options) {
+ const io::FileReaderOptions&
reader_options,
+ RuntimeProfile* profile) {
std::shared_ptr<io::HdfsFileSystem> fs;
- RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs));
RETURN_IF_ERROR(fs->open_file(path, reader_options, reader));
*hdfs_file_system = std::move(fs);
return Status::OK();
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index b5cbcdfc7c..0f9dee60d0 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -81,7 +81,8 @@ public:
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const
std::string& path,
std::shared_ptr<io::FileSystem>*
hdfs_file_system,
io::FileReaderSPtr* reader,
- const io::FileReaderOptions&
reader_options);
+ const io::FileReaderOptions&
reader_options,
+ RuntimeProfile* profile);
static Status create_s3_reader(const std::map<std::string, std::string>&
prop,
const std::string& path,
diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp
b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
index b508e14a24..d6ad059b77 100644
--- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp
+++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
@@ -50,8 +50,8 @@ public:
io::FileReaderSPtr reader;
io::FileReaderOptions reader_opts =
FileFactory::get_reader_options(nullptr);
THdfsParams hdfs_params = parse_properties(_conf_map);
- RETURN_IF_ERROR(
- FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs,
&reader, reader_opts));
+ RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params,
file_path, &fs, &reader,
+ reader_opts, nullptr));
auto end = std::chrono::high_resolution_clock::now();
auto elapsed_seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(end
- start);
@@ -94,7 +94,7 @@ public:
std::shared_ptr<io::HdfsFileSystem> fs;
io::FileWriterPtr writer;
THdfsParams hdfs_params = parse_properties(_conf_map);
- RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr,
&fs));
RETURN_IF_ERROR(fs->create_file(file_path, &writer));
return write(state, writer.get());
}
@@ -115,7 +115,7 @@ public:
auto new_file_path = file_path + "_new";
THdfsParams hdfs_params = parse_properties(_conf_map);
std::shared_ptr<io::HdfsFileSystem> fs;
- RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr,
&fs));
auto start = std::chrono::high_resolution_clock::now();
RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
@@ -142,7 +142,7 @@ public:
std::shared_ptr<io::HdfsFileSystem> fs;
THdfsParams hdfs_params = parse_properties(_conf_map);
- RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr,
&fs));
auto start = std::chrono::high_resolution_clock::now();
bool res = false;
diff --git a/be/src/io/fs/hdfs_file_reader.cpp
b/be/src/io/fs/hdfs_file_reader.cpp
index 402cdb3faf..cf3a2b6563 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -36,12 +36,29 @@ namespace doris {
namespace io {
HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node,
- FileHandleCache::Accessor accessor)
- : _path(std::move(path)), _name_node(name_node),
_accessor(std::move(accessor)) {
+ FileHandleCache::Accessor accessor,
RuntimeProfile* profile)
+ : _path(std::move(path)),
+ _name_node(name_node),
+ _accessor(std::move(accessor)),
+ _profile(profile) {
_handle = _accessor.get();
DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
+ if (_profile != nullptr) {
+#ifdef USE_HADOOP_HDFS
+ const char* hdfs_profile_name = "HdfsIO";
+ ADD_TIMER(_profile, hdfs_profile_name);
+ _hdfs_profile.total_bytes_read =
+ ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES,
hdfs_profile_name);
+ _hdfs_profile.total_local_bytes_read =
+ ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead",
TUnit::BYTES, hdfs_profile_name);
+ _hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER(
+ _profile, "TotalShortCircuitBytesRead", TUnit::BYTES,
hdfs_profile_name);
+ _hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER(
+ _profile, "TotalZeroCopyBytesRead", TUnit::BYTES,
hdfs_profile_name);
+#endif
+ }
}
HdfsFileReader::~HdfsFileReader() {
@@ -52,6 +69,25 @@ Status HdfsFileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true,
std::memory_order_acq_rel)) {
DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
+ if (_profile != nullptr) {
+#ifdef USE_HADOOP_HDFS
+ struct hdfsReadStatistics* hdfs_statistics = nullptr;
+ auto r = hdfsFileGetReadStatistics(_handle->file(),
&hdfs_statistics);
+ if (r != 0) {
+ return Status::InternalError(
+ fmt::format("Failed to run
hdfsFileGetReadStatistics(): {}", r));
+ }
+ COUNTER_UPDATE(_hdfs_profile.total_bytes_read,
hdfs_statistics->totalBytesRead);
+ COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
+ hdfs_statistics->totalLocalBytesRead);
+ COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
+ hdfs_statistics->totalShortCircuitBytesRead);
+ COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
+ hdfs_statistics->totalZeroCopyBytesRead);
+ hdfsFileFreeReadStatistics(hdfs_statistics);
+ hdfsFileClearReadStatistics(_handle->file());
+#endif
+ }
}
return Status::OK();
}
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index efff1bfcd6..864a55bc41 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -38,7 +38,8 @@ class IOContext;
class HdfsFileReader : public FileReader {
public:
- HdfsFileReader(Path path, const std::string& name_node,
FileHandleCache::Accessor accessor);
+ HdfsFileReader(Path path, const std::string& name_node,
FileHandleCache::Accessor accessor,
+ RuntimeProfile* profile);
~HdfsFileReader() override;
@@ -57,11 +58,24 @@ protected:
const IOContext* io_ctx) override;
private:
+#ifdef USE_HADOOP_HDFS
+ struct HDFSProfile {
+ RuntimeProfile::Counter* total_bytes_read;
+ RuntimeProfile::Counter* total_local_bytes_read;
+ RuntimeProfile::Counter* total_short_circuit_bytes_read;
+ RuntimeProfile::Counter* total_total_zero_copy_bytes_read;
+ };
+#endif
+
Path _path;
const std::string& _name_node;
FileHandleCache::Accessor _accessor;
CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
std::atomic<bool> _closed = false;
+ RuntimeProfile* _profile;
+#ifdef USE_HADOOP_HDFS
+ HDFSProfile _hdfs_profile;
+#endif
};
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/hdfs_file_system.cpp
b/be/src/io/fs/hdfs_file_system.cpp
index 775754bd4d..5e90f04dba 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -123,7 +123,7 @@ Status HdfsFileHandleCache::get_file(const
std::shared_ptr<HdfsFileSystem>& fs,
}
Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const
std::string& path,
- std::shared_ptr<HdfsFileSystem>* fs) {
+ RuntimeProfile* profile,
std::shared_ptr<HdfsFileSystem>* fs) {
#ifdef USE_HADOOP_HDFS
if (!config::enable_java_support) {
return Status::InternalError(
@@ -131,14 +131,16 @@ Status HdfsFileSystem::create(const THdfsParams&
hdfs_params, const std::string&
"true.");
}
#endif
- (*fs).reset(new HdfsFileSystem(hdfs_params, path));
+ (*fs).reset(new HdfsFileSystem(hdfs_params, path, profile));
return (*fs)->connect();
}
-HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const
std::string& path)
+HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const
std::string& path,
+ RuntimeProfile* profile)
: RemoteFileSystem(path, "", FileSystemType::HDFS),
_hdfs_params(hdfs_params),
- _fs_handle(nullptr) {
+ _fs_handle(nullptr),
+ _profile(profile) {
_namenode = _hdfs_params.fs_name;
}
@@ -175,7 +177,7 @@ Status HdfsFileSystem::open_file_internal(const Path& file,
int64_t file_size,
std::static_pointer_cast<HdfsFileSystem>(shared_from_this()),
real_path, 0, file_size,
&accessor));
- *reader = std::make_shared<HdfsFileReader>(file, _namenode,
std::move(accessor));
+ *reader = std::make_shared<HdfsFileReader>(file, _namenode,
std::move(accessor), _profile);
return Status::OK();
}
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index bd28ec73c2..6a45a92b37 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -33,6 +33,7 @@
#include "io/fs/hdfs.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
+#include "util/runtime_profile.h"
namespace doris {
class THdfsParams;
@@ -111,7 +112,7 @@ class HdfsFileHandleCache;
class HdfsFileSystem final : public RemoteFileSystem {
public:
static Status create(const THdfsParams& hdfs_params, const std::string&
path,
- std::shared_ptr<HdfsFileSystem>* fs);
+ RuntimeProfile* profile,
std::shared_ptr<HdfsFileSystem>* fs);
~HdfsFileSystem() override;
@@ -148,12 +149,14 @@ private:
private:
friend class HdfsFileWriter;
- HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path);
+ HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
+ RuntimeProfile* profile);
const THdfsParams& _hdfs_params;
std::string _namenode;
// do not use std::shared_ptr or std::unique_ptr
// _fs_handle is managed by HdfsFileSystemCache
HdfsFileSystemHandle* _fs_handle;
+ RuntimeProfile* _profile;
};
} // namespace io
} // namespace doris
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index f1b58fa454..3ff8229bc3 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type,
const std::string& l
} else if (TStorageBackendType::type::HDFS == type) {
THdfsParams hdfs_params = parse_properties(_prop);
std::shared_ptr<io::HdfsFileSystem> fs;
- RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr,
&fs));
_remote_fs = std::move(fs);
} else if (TStorageBackendType::type::BROKER == type) {
std::shared_ptr<io::BrokerFileSystem> fs;
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp
b/be/src/vec/runtime/vfile_result_writer.cpp
index ed408e5f7a..8977cd0c47 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -595,7 +595,7 @@ Status VFileResultWriter::_delete_dir() {
case TStorageBackendType::HDFS: {
THdfsParams hdfs_params =
parse_properties(_file_opts->broker_properties);
std::shared_ptr<io::HdfsFileSystem> hdfs_fs = nullptr;
- RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &hdfs_fs));
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr,
&hdfs_fs));
file_system = hdfs_fs;
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]