This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7408df2affe [fix](fs) Fix hdfs file writer (#33680)
7408df2affe is described below
commit 7408df2affe3c227926b912b3c0f5780c1a4003f
Author: plat1ko <[email protected]>
AuthorDate: Wed Apr 17 16:26:50 2024 +0800
[fix](fs) Fix hdfs file writer (#33680)
---
be/src/io/file_factory.cpp | 8 ++---
be/src/io/file_factory.h | 3 +-
be/src/io/fs/hdfs_file_system.cpp | 3 +-
be/src/io/fs/hdfs_file_writer.cpp | 49 +++++++++++++++++---------
be/src/io/fs/hdfs_file_writer.h | 8 +++--
be/src/service/internal_service.cpp | 6 +++-
be/src/vec/core/block_spill_writer.cpp | 6 +++-
be/src/vec/sink/writer/vfile_result_writer.cpp | 6 +++-
8 files changed, 59 insertions(+), 30 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 37132b300df..167fcf31b76 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -100,7 +100,8 @@ Result<io::FileSystemSPtr> FileFactory::create_fs(const
io::FSPropertiesRef& fs_
Result<io::FileWriterPtr> FileFactory::create_file_writer(
TFileType::type type, ExecEnv* env, const
std::vector<TNetworkAddress>& broker_addresses,
- const std::map<std::string, std::string>& properties, const
std::string& path) {
+ const std::map<std::string, std::string>& properties, const
std::string& path,
+ const io::FileWriterOptions& options) {
io::FileWriterPtr file_writer;
switch (type) {
case TFileType::FILE_LOCAL: {
@@ -117,16 +118,15 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer(
RETURN_IF_ERROR_RESULT(
S3ClientFactory::convert_properties_to_s3_conf(properties,
s3_uri, &s3_conf));
auto client = S3ClientFactory::instance().create(s3_conf.client_conf);
- // TODO(plat1ko): Set opts
return std::make_unique<io::S3FileWriter>(std::move(client),
std::move(s3_conf.bucket),
- s3_uri.get_key(), nullptr);
+ s3_uri.get_key(), &options);
}
case TFileType::FILE_HDFS: {
THdfsParams hdfs_params = parse_properties(properties);
io::HdfsHandler* handler;
RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection(
hdfs_params, hdfs_params.fs_name, &handler));
- auto res = io::HdfsFileWriter::create(path, handler,
hdfs_params.fs_name);
+ auto res = io::HdfsFileWriter::create(path, handler,
hdfs_params.fs_name, &options);
if (!res.has_value()) {
handler->dec_ref();
}
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 9a5f43ebcb2..9d9d714812f 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -94,7 +94,8 @@ public:
static Result<io::FileWriterPtr> create_file_writer(
TFileType::type type, ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
- const std::map<std::string, std::string>& properties, const
std::string& path);
+ const std::map<std::string, std::string>& properties, const
std::string& path,
+ const io::FileWriterOptions& options);
/// Create FileReader without FS
static Result<io::FileReaderSPtr> create_file_reader(
diff --git a/be/src/io/fs/hdfs_file_system.cpp
b/be/src/io/fs/hdfs_file_system.cpp
index 5ea742c20d5..4ab32920c8c 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -276,8 +276,7 @@ Status HdfsFileSystem::upload_impl(const Path& local_file,
const Path& remote_fi
left_len -= read_len;
}
- LOG(INFO) << "finished to write file: " << local_file << ", length: " <<
file_len;
- return Status::OK();
+ return hdfs_writer->close();
}
Status HdfsFileSystem::batch_upload_impl(const std::vector<Path>& local_files,
diff --git a/be/src/io/fs/hdfs_file_writer.cpp
b/be/src/io/fs/hdfs_file_writer.cpp
index fcb4ccfd74a..7efb4bfb073 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -34,13 +34,18 @@
namespace doris::io {
HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile
hdfs_file,
- std::string fs_name)
+ std::string fs_name, const FileWriterOptions*
opts)
: _path(std::move(path)),
_hdfs_handler(handler),
_hdfs_file(hdfs_file),
- _fs_name(std::move(fs_name)) {}
+ _fs_name(std::move(fs_name)),
+ _sync_file_data(opts ? opts->sync_file_data : true) {}
HdfsFileWriter::~HdfsFileWriter() {
+ if (_hdfs_file) {
+ hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+ }
+
if (_hdfs_handler->from_cache) {
_hdfs_handler->dec_ref();
} else {
@@ -53,23 +58,25 @@ Status HdfsFileWriter::close() {
return Status::OK();
}
_closed = true;
- int result = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
- if (result == -1) {
- std::stringstream ss;
- ss << "failed to flush hdfs file. "
- << "fs_name:" << _fs_name << " path:" << _path << ", err: " <<
hdfs_error();
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
+
+ if (_sync_file_data) {
+ int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+ if (ret != 0) {
+ return Status::InternalError("failed to sync hdfs file. fs_name={}
path={} : {}",
+ _fs_name, _path.native(),
hdfs_error());
+ }
}
- result = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+ // The underlying implementation will invoke `hdfsHFlush` to flush
buffered data and wait for
+ // the HDFS response, but won't guarantee the synchronization of data to
HDFS.
+ int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
_hdfs_file = nullptr;
- if (result != 0) {
- std::string err_msg = hdfs_error();
+ if (ret != 0) {
return Status::InternalError(
"Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err:
{}",
- BackendOptions::get_localhost(), _fs_name, _path.string(),
err_msg);
+ BackendOptions::get_localhost(), _fs_name, _path.native(),
hdfs_error());
}
+
return Status::OK();
}
@@ -101,12 +108,20 @@ Status HdfsFileWriter::finalize() {
if (_closed) [[unlikely]] {
return Status::InternalError("finalize closed file: {}",
_path.native());
}
- // FIXME(plat1ko): `finalize` method should not be an operation which can
be blocked for a long time
- return close();
+
+ // Flush buffered data to HDFS without waiting for HDFS response
+ int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
+ if (ret != 0) {
+ return Status::InternalError("failed to flush hdfs file. fs_name={}
path={} : {}", _fs_name,
+ _path.native(), hdfs_error());
+ }
+
+ return Status::OK();
}
Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler*
handler,
- const std::string& fs_name) {
+ const std::string& fs_name,
+ const FileWriterOptions* opts) {
auto path = convert_path(full_path, fs_name);
std::string hdfs_dir = path.parent_path().string();
int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str());
@@ -133,7 +148,7 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path
full_path, HdfsHandler* handle
return ResultError(Status::InternalError(ss.str()));
}
VLOG_NOTICE << "open file. fs_name:" << fs_name << ", path:" << path;
- return std::make_unique<HdfsFileWriter>(std::move(path), handler,
hdfs_file, fs_name);
+ return std::make_unique<HdfsFileWriter>(std::move(path), handler,
hdfs_file, fs_name, opts);
}
} // namespace doris::io
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 87781b759e6..7ad041c94b7 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -33,10 +33,11 @@ public:
// - fs_name/path_to_file
// - /path_to_file
// TODO(plat1ko): Support related path for cloud mode
- static Result<FileWriterPtr> create(Path path, HdfsHandler* handler,
- const std::string& fs_name);
+ static Result<FileWriterPtr> create(Path path, HdfsHandler* handler, const
std::string& fs_name,
+ const FileWriterOptions* opts =
nullptr);
- HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file,
std::string fs_name);
+ HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file,
std::string fs_name,
+ const FileWriterOptions* opts = nullptr);
~HdfsFileWriter() override;
Status close() override;
@@ -53,6 +54,7 @@ private:
std::string _fs_name;
size_t _bytes_appended = 0;
bool _closed = false;
+ bool _sync_file_data;
};
} // namespace doris::io
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index aa9c207d689..2976928557e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -666,7 +666,11 @@ void
PInternalService::outfile_write_success(google::protobuf::RpcController* co
auto&& res = FileFactory::create_file_writer(
FileFactory::convert_storage_type(result_file_sink.storage_backend_type),
ExecEnv::GetInstance(), file_options.broker_addresses,
- file_options.broker_properties, file_name);
+ file_options.broker_properties, file_name,
+ {
+ .write_file_cache = false,
+ .sync_file_data = false,
+ });
using T = std::decay_t<decltype(res)>;
if (!res.has_value()) [[unlikely]] {
st = std::forward<T>(res).error();
diff --git a/be/src/vec/core/block_spill_writer.cpp
b/be/src/vec/core/block_spill_writer.cpp
index 7ad2d1d8027..92fe34a3eb0 100644
--- a/be/src/vec/core/block_spill_writer.cpp
+++ b/be/src/vec/core/block_spill_writer.cpp
@@ -43,7 +43,11 @@ void BlockSpillWriter::_init_profile() {
Status BlockSpillWriter::open() {
file_writer_ = DORIS_TRY(FileFactory::create_file_writer(
- TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {},
file_path_));
+ TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_,
+ {
+ .write_file_cache = false,
+ .sync_file_data = false,
+ }));
is_open_ = true;
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 811658afa4d..d83b3ce5103 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -108,7 +108,11 @@ Status VFileResultWriter::_create_next_file_writer() {
Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
_file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
FileFactory::convert_storage_type(_storage_type),
_state->exec_env(),
- _file_opts->broker_addresses, _file_opts->broker_properties,
file_name));
+ _file_opts->broker_addresses, _file_opts->broker_properties,
file_name,
+ {
+ .write_file_cache = false,
+ .sync_file_data = false,
+ }));
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
_vfile_writer.reset(new VCSVTransformer(_state,
_file_writer_impl.get(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]