This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 98fa4b548433bc392f4bbae4319f610cb8da133e Author: plat1ko <[email protected]> AuthorDate: Wed Apr 17 16:26:50 2024 +0800 [fix](fs) Fix hdfs file writer (#33680) --- be/src/io/file_factory.cpp | 8 ++--- be/src/io/file_factory.h | 3 +- be/src/io/fs/hdfs_file_system.cpp | 3 +- be/src/io/fs/hdfs_file_writer.cpp | 49 +++++++++++++++++--------- be/src/io/fs/hdfs_file_writer.h | 8 +++-- be/src/service/internal_service.cpp | 6 +++- be/src/vec/core/block_spill_writer.cpp | 6 +++- be/src/vec/sink/writer/vfile_result_writer.cpp | 6 +++- 8 files changed, 59 insertions(+), 30 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 37132b300df..167fcf31b76 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -100,7 +100,8 @@ Result<io::FileSystemSPtr> FileFactory::create_fs(const io::FSPropertiesRef& fs_ Result<io::FileWriterPtr> FileFactory::create_file_writer( TFileType::type type, ExecEnv* env, const std::vector<TNetworkAddress>& broker_addresses, - const std::map<std::string, std::string>& properties, const std::string& path) { + const std::map<std::string, std::string>& properties, const std::string& path, + const io::FileWriterOptions& options) { io::FileWriterPtr file_writer; switch (type) { case TFileType::FILE_LOCAL: { @@ -117,16 +118,15 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer( RETURN_IF_ERROR_RESULT( S3ClientFactory::convert_properties_to_s3_conf(properties, s3_uri, &s3_conf)); auto client = S3ClientFactory::instance().create(s3_conf.client_conf); - // TODO(plat1ko): Set opts return std::make_unique<io::S3FileWriter>(std::move(client), std::move(s3_conf.bucket), - s3_uri.get_key(), nullptr); + s3_uri.get_key(), &options); } case TFileType::FILE_HDFS: { THdfsParams hdfs_params = parse_properties(properties); io::HdfsHandler* handler; RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection( hdfs_params, hdfs_params.fs_name, &handler)); - auto res = io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name); + auto res = io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options); if (!res.has_value()) { handler->dec_ref(); } diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 9a5f43ebcb2..9d9d714812f 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -94,7 +94,8 @@ public: static Result<io::FileWriterPtr> create_file_writer( TFileType::type type, ExecEnv* env, const std::vector<TNetworkAddress>& broker_addresses, - const std::map<std::string, std::string>& properties, const std::string& path); + const std::map<std::string, std::string>& properties, const std::string& path, + const io::FileWriterOptions& options); /// Create FileReader without FS static Result<io::FileReaderSPtr> create_file_reader( diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 5ea742c20d5..4ab32920c8c 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -276,8 +276,7 @@ Status HdfsFileSystem::upload_impl(const Path& local_file, const Path& remote_fi left_len -= read_len; } - LOG(INFO) << "finished to write file: " << local_file << ", length: " << file_len; - return Status::OK(); + return hdfs_writer->close(); } Status HdfsFileSystem::batch_upload_impl(const std::vector<Path>& local_files, diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index fcb4ccfd74a..7efb4bfb073 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -34,13 +34,18 @@ namespace doris::io { HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, - std::string fs_name) + std::string fs_name, const FileWriterOptions* opts) : _path(std::move(path)), _hdfs_handler(handler), _hdfs_file(hdfs_file), - _fs_name(std::move(fs_name)) {} + _fs_name(std::move(fs_name)), + _sync_file_data(opts ? opts->sync_file_data : true) {} HdfsFileWriter::~HdfsFileWriter() { + if (_hdfs_file) { + hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (_hdfs_handler->from_cache) { _hdfs_handler->dec_ref(); } else { @@ -53,23 +58,25 @@ Status HdfsFileWriter::close() { return Status::OK(); } _closed = true; - int result = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); - if (result == -1) { - std::stringstream ss; - ss << "failed to flush hdfs file. " - << "fs_name:" << _fs_name << " path:" << _path << ", err: " << hdfs_error(); - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + + if (_sync_file_data) { + int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + if (ret != 0) { + return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}", + _fs_name, _path.native(), hdfs_error()); + } } - result = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for + // the HDFS response, but won't guarantee the synchronization of data to HDFS. + int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); _hdfs_file = nullptr; - if (result != 0) { - std::string err_msg = hdfs_error(); + if (ret != 0) { return Status::InternalError( "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", - BackendOptions::get_localhost(), _fs_name, _path.string(), err_msg); + BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error()); } + return Status::OK(); } @@ -101,12 +108,20 @@ Status HdfsFileWriter::finalize() { if (_closed) [[unlikely]] { return Status::InternalError("finalize closed file: {}", _path.native()); } - // FIXME(plat1ko): `finalize` method should not be an operation which can be blocked for a long time - return close(); + + // Flush buffered data to HDFS without waiting for HDFS response + int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); + if (ret != 0) { + return Status::InternalError("failed to flush hdfs file. fs_name={} path={} : {}", _fs_name, + _path.native(), hdfs_error()); + } + + return Status::OK(); } Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handler, - const std::string& fs_name) { + const std::string& fs_name, + const FileWriterOptions* opts) { auto path = convert_path(full_path, fs_name); std::string hdfs_dir = path.parent_path().string(); int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str()); @@ -133,7 +148,7 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handle return ResultError(Status::InternalError(ss.str())); } VLOG_NOTICE << "open file. fs_name:" << fs_name << ", path:" << path; - return std::make_unique<HdfsFileWriter>(std::move(path), handler, hdfs_file, fs_name); + return std::make_unique<HdfsFileWriter>(std::move(path), handler, hdfs_file, fs_name, opts); } } // namespace doris::io diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index 87781b759e6..7ad041c94b7 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -33,10 +33,11 @@ public: // - fs_name/path_to_file // - /path_to_file // TODO(plat1ko): Support related path for cloud mode - static Result<FileWriterPtr> create(Path path, HdfsHandler* handler, - const std::string& fs_name); + static Result<FileWriterPtr> create(Path path, HdfsHandler* handler, const std::string& fs_name, + const FileWriterOptions* opts = nullptr); - HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name); + HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name, + const FileWriterOptions* opts = nullptr); ~HdfsFileWriter() override; Status close() override; @@ -53,6 +54,7 @@ private: std::string _fs_name; size_t _bytes_appended = 0; bool _closed = false; + bool _sync_file_data; }; } // namespace doris::io diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index aa9c207d689..2976928557e 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -666,7 +666,11 @@ void PInternalService::outfile_write_success(google::protobuf::RpcController* co auto&& res = FileFactory::create_file_writer( FileFactory::convert_storage_type(result_file_sink.storage_backend_type), ExecEnv::GetInstance(), file_options.broker_addresses, - file_options.broker_properties, file_name); + file_options.broker_properties, file_name, + { + .write_file_cache = false, + .sync_file_data = false, + }); using T = std::decay_t<decltype(res)>; if (!res.has_value()) [[unlikely]] { st = std::forward<T>(res).error(); diff --git a/be/src/vec/core/block_spill_writer.cpp b/be/src/vec/core/block_spill_writer.cpp index 7ad2d1d8027..92fe34a3eb0 100644 --- a/be/src/vec/core/block_spill_writer.cpp +++ b/be/src/vec/core/block_spill_writer.cpp @@ -43,7 +43,11 @@ void BlockSpillWriter::_init_profile() { Status BlockSpillWriter::open() { file_writer_ = DORIS_TRY(FileFactory::create_file_writer( - TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_)); + TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_, + { + .write_file_cache = false, + .sync_file_data = false, + })); is_open_ = true; return Status::OK(); } diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index 811658afa4d..d83b3ce5103 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -108,7 +108,11 @@ Status VFileResultWriter::_create_next_file_writer() { Status VFileResultWriter::_create_file_writer(const std::string& file_name) { _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer( FileFactory::convert_storage_type(_storage_type), _state->exec_env(), - _file_opts->broker_addresses, _file_opts->broker_properties, file_name)); + _file_opts->broker_addresses, _file_opts->broker_properties, file_name, + { + .write_file_cache = false, + .sync_file_data = false, + })); switch (_file_opts->file_format) { case TFileFormatType::FORMAT_CSV_PLAIN: _vfile_writer.reset(new VCSVTransformer(_state, _file_writer_impl.get(), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
