This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7408df2affe [fix](fs) Fix hdfs file writer (#33680)
7408df2affe is described below

commit 7408df2affe3c227926b912b3c0f5780c1a4003f
Author: plat1ko <[email protected]>
AuthorDate: Wed Apr 17 16:26:50 2024 +0800

    [fix](fs) Fix hdfs file writer (#33680)
---
 be/src/io/file_factory.cpp                     |  8 ++---
 be/src/io/file_factory.h                       |  3 +-
 be/src/io/fs/hdfs_file_system.cpp              |  3 +-
 be/src/io/fs/hdfs_file_writer.cpp              | 49 +++++++++++++++++---------
 be/src/io/fs/hdfs_file_writer.h                |  8 +++--
 be/src/service/internal_service.cpp            |  6 +++-
 be/src/vec/core/block_spill_writer.cpp         |  6 +++-
 be/src/vec/sink/writer/vfile_result_writer.cpp |  6 +++-
 8 files changed, 59 insertions(+), 30 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 37132b300df..167fcf31b76 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -100,7 +100,8 @@ Result<io::FileSystemSPtr> FileFactory::create_fs(const 
io::FSPropertiesRef& fs_
 
 Result<io::FileWriterPtr> FileFactory::create_file_writer(
         TFileType::type type, ExecEnv* env, const 
std::vector<TNetworkAddress>& broker_addresses,
-        const std::map<std::string, std::string>& properties, const 
std::string& path) {
+        const std::map<std::string, std::string>& properties, const 
std::string& path,
+        const io::FileWriterOptions& options) {
     io::FileWriterPtr file_writer;
     switch (type) {
     case TFileType::FILE_LOCAL: {
@@ -117,16 +118,15 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer(
         RETURN_IF_ERROR_RESULT(
                 S3ClientFactory::convert_properties_to_s3_conf(properties, 
s3_uri, &s3_conf));
         auto client = S3ClientFactory::instance().create(s3_conf.client_conf);
-        // TODO(plat1ko): Set opts
         return std::make_unique<io::S3FileWriter>(std::move(client), 
std::move(s3_conf.bucket),
-                                                  s3_uri.get_key(), nullptr);
+                                                  s3_uri.get_key(), &options);
     }
     case TFileType::FILE_HDFS: {
         THdfsParams hdfs_params = parse_properties(properties);
         io::HdfsHandler* handler;
         
RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection(
                 hdfs_params, hdfs_params.fs_name, &handler));
-        auto res = io::HdfsFileWriter::create(path, handler, 
hdfs_params.fs_name);
+        auto res = io::HdfsFileWriter::create(path, handler, 
hdfs_params.fs_name, &options);
         if (!res.has_value()) {
             handler->dec_ref();
         }
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 9a5f43ebcb2..9d9d714812f 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -94,7 +94,8 @@ public:
     static Result<io::FileWriterPtr> create_file_writer(
             TFileType::type type, ExecEnv* env,
             const std::vector<TNetworkAddress>& broker_addresses,
-            const std::map<std::string, std::string>& properties, const 
std::string& path);
+            const std::map<std::string, std::string>& properties, const 
std::string& path,
+            const io::FileWriterOptions& options);
 
     /// Create FileReader without FS
     static Result<io::FileReaderSPtr> create_file_reader(
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index 5ea742c20d5..4ab32920c8c 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -276,8 +276,7 @@ Status HdfsFileSystem::upload_impl(const Path& local_file, 
const Path& remote_fi
         left_len -= read_len;
     }
 
-    LOG(INFO) << "finished to write file: " << local_file << ", length: " << 
file_len;
-    return Status::OK();
+    return hdfs_writer->close();
 }
 
 Status HdfsFileSystem::batch_upload_impl(const std::vector<Path>& local_files,
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index fcb4ccfd74a..7efb4bfb073 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -34,13 +34,18 @@
 namespace doris::io {
 
 HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile 
hdfs_file,
-                               std::string fs_name)
+                               std::string fs_name, const FileWriterOptions* 
opts)
         : _path(std::move(path)),
           _hdfs_handler(handler),
           _hdfs_file(hdfs_file),
-          _fs_name(std::move(fs_name)) {}
+          _fs_name(std::move(fs_name)),
+          _sync_file_data(opts ? opts->sync_file_data : true) {}
 
 HdfsFileWriter::~HdfsFileWriter() {
+    if (_hdfs_file) {
+        hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    }
+
     if (_hdfs_handler->from_cache) {
         _hdfs_handler->dec_ref();
     } else {
@@ -53,23 +58,25 @@ Status HdfsFileWriter::close() {
         return Status::OK();
     }
     _closed = true;
-    int result = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
-    if (result == -1) {
-        std::stringstream ss;
-        ss << "failed to flush hdfs file. "
-           << "fs_name:" << _fs_name << " path:" << _path << ", err: " << 
hdfs_error();
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
+
+    if (_sync_file_data) {
+        int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        if (ret != 0) {
+            return Status::InternalError("failed to sync hdfs file. fs_name={} 
path={} : {}",
+                                         _fs_name, _path.native(), 
hdfs_error());
+        }
     }
 
-    result = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
+    // the HDFS response, but won't guarantee the synchronization of data to 
HDFS.
+    int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
     _hdfs_file = nullptr;
-    if (result != 0) {
-        std::string err_msg = hdfs_error();
+    if (ret != 0) {
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
-                BackendOptions::get_localhost(), _fs_name, _path.string(), 
err_msg);
+                BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error());
     }
+
     return Status::OK();
 }
 
@@ -101,12 +108,20 @@ Status HdfsFileWriter::finalize() {
     if (_closed) [[unlikely]] {
         return Status::InternalError("finalize closed file: {}", 
_path.native());
     }
-    // FIXME(plat1ko): `finalize` method should not be an operation which can 
be blocked for a long time
-    return close();
+
+    // Flush buffered data to HDFS without waiting for HDFS response
+    int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
+    if (ret != 0) {
+        return Status::InternalError("failed to flush hdfs file. fs_name={} 
path={} : {}", _fs_name,
+                                     _path.native(), hdfs_error());
+    }
+
+    return Status::OK();
 }
 
 Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* 
handler,
-                                             const std::string& fs_name) {
+                                             const std::string& fs_name,
+                                             const FileWriterOptions* opts) {
     auto path = convert_path(full_path, fs_name);
     std::string hdfs_dir = path.parent_path().string();
     int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str());
@@ -133,7 +148,7 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path 
full_path, HdfsHandler* handle
         return ResultError(Status::InternalError(ss.str()));
     }
     VLOG_NOTICE << "open file. fs_name:" << fs_name << ", path:" << path;
-    return std::make_unique<HdfsFileWriter>(std::move(path), handler, 
hdfs_file, fs_name);
+    return std::make_unique<HdfsFileWriter>(std::move(path), handler, 
hdfs_file, fs_name, opts);
 }
 
 } // namespace doris::io
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 87781b759e6..7ad041c94b7 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -33,10 +33,11 @@ public:
     // - fs_name/path_to_file
     // - /path_to_file
     // TODO(plat1ko): Support related path for cloud mode
-    static Result<FileWriterPtr> create(Path path, HdfsHandler* handler,
-                                        const std::string& fs_name);
+    static Result<FileWriterPtr> create(Path path, HdfsHandler* handler, const 
std::string& fs_name,
+                                        const FileWriterOptions* opts = 
nullptr);
 
-    HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, 
std::string fs_name);
+    HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, 
std::string fs_name,
+                   const FileWriterOptions* opts = nullptr);
     ~HdfsFileWriter() override;
 
     Status close() override;
@@ -53,6 +54,7 @@ private:
     std::string _fs_name;
     size_t _bytes_appended = 0;
     bool _closed = false;
+    bool _sync_file_data;
 };
 
 } // namespace doris::io
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index aa9c207d689..2976928557e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -666,7 +666,11 @@ void 
PInternalService::outfile_write_success(google::protobuf::RpcController* co
         auto&& res = FileFactory::create_file_writer(
                 
FileFactory::convert_storage_type(result_file_sink.storage_backend_type),
                 ExecEnv::GetInstance(), file_options.broker_addresses,
-                file_options.broker_properties, file_name);
+                file_options.broker_properties, file_name,
+                {
+                        .write_file_cache = false,
+                        .sync_file_data = false,
+                });
         using T = std::decay_t<decltype(res)>;
         if (!res.has_value()) [[unlikely]] {
             st = std::forward<T>(res).error();
diff --git a/be/src/vec/core/block_spill_writer.cpp 
b/be/src/vec/core/block_spill_writer.cpp
index 7ad2d1d8027..92fe34a3eb0 100644
--- a/be/src/vec/core/block_spill_writer.cpp
+++ b/be/src/vec/core/block_spill_writer.cpp
@@ -43,7 +43,11 @@ void BlockSpillWriter::_init_profile() {
 
 Status BlockSpillWriter::open() {
     file_writer_ = DORIS_TRY(FileFactory::create_file_writer(
-            TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, 
file_path_));
+            TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_,
+            {
+                    .write_file_cache = false,
+                    .sync_file_data = false,
+            }));
     is_open_ = true;
     return Status::OK();
 }
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 811658afa4d..d83b3ce5103 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -108,7 +108,11 @@ Status VFileResultWriter::_create_next_file_writer() {
 Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
     _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
             FileFactory::convert_storage_type(_storage_type), 
_state->exec_env(),
-            _file_opts->broker_addresses, _file_opts->broker_properties, 
file_name));
+            _file_opts->broker_addresses, _file_opts->broker_properties, 
file_name,
+            {
+                    .write_file_cache = false,
+                    .sync_file_data = false,
+            }));
     switch (_file_opts->file_format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
         _vfile_writer.reset(new VCSVTransformer(_state, 
_file_writer_impl.get(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to