This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a8d8c6a2711 [fix](file-writer) opt s3 file writer and fix empty file 
related issue #28983 #30703 #31169 (#31213)
a8d8c6a2711 is described below

commit a8d8c6a2711ac18951ee7d8384de9588c62902a7
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Feb 21 16:48:54 2024 +0800

    [fix](file-writer) opt s3 file writer and fix empty file related issue 
#28983 #30703 #31169 (#31213)
    
    * (feature)(cloud) Use dynamic allocator instead of static buffer pool for 
better elasticity. (#28983)
    
    * [fix](outfile) Fix unable to export empty data (#30703)
    
    Issue Number: close #30600
    Fix unable to export empty data to hdfs / S3, this behavior is inconsistent 
with version 1.2.7,
    version 1.2.7 can export empty data to hdfs/ S3, and there will be exported 
files on S3/HDFS.
    
    * [fix](file-writer) avoid empty file for segment writer (#31169)
    
    ---------
    
    Co-authored-by: AlexYue <[email protected]>
    Co-authored-by: zxealous <[email protected]>
---
 be/src/common/config.cpp                           |   4 -
 be/src/common/config.h                             |   4 -
 be/src/io/fs/benchmark/fs_benchmark_tool.cpp       |   2 -
 be/src/io/fs/broker_file_system.cpp                |   2 +-
 be/src/io/fs/broker_file_writer.cpp                |  15 +-
 be/src/io/fs/broker_file_writer.h                  |   3 +-
 be/src/io/fs/file_writer.h                         |   6 +
 be/src/io/fs/hdfs_file_system.cpp                  |   4 +-
 be/src/io/fs/hdfs_file_writer.cpp                  |  12 +-
 be/src/io/fs/hdfs_file_writer.h                    |   3 +-
 be/src/io/fs/s3_file_bufferpool.cpp                | 340 ++++++++-------------
 be/src/io/fs/s3_file_bufferpool.h                  | 103 ++-----
 be/src/io/fs/s3_file_writer.cpp                    |  70 +++--
 be/src/olap/rowset/beta_rowset_writer.cpp          |   2 +-
 be/src/runtime/exec_env.h                          |   3 -
 be/src/runtime/exec_env_init.cpp                   |   6 -
 be/src/vec/sink/writer/vfile_result_writer.cpp     |   1 +
 be/test/io/fs/s3_file_writer_test.cpp              |   7 -
 .../outfile/csv/test_outfile_empty_data.out        |   9 +
 .../outfile/csv/test_outfile_empty_data.groovy     | 166 ++++++++++
 20 files changed, 407 insertions(+), 355 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ad8f2aece43..709526dbf42 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1040,10 +1040,6 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
 DEFINE_mInt64(row_column_page_size, "4096");
 // it must be larger than or equal to 5MB
 DEFINE_mInt32(s3_write_buffer_size, "5242880");
-// the size of the whole s3 buffer pool, which indicates the s3 file writer
-// can at most buffer 50MB data. And the num of multi part upload task is
-// s3_write_buffer_whole_size / s3_write_buffer_size
-DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
 // The timeout config for S3 buffer allocation
 DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
 DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e75d6d472a8..e0e5c52e30c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1077,10 +1077,6 @@ DECLARE_mInt32(tablet_path_check_batch_size);
 DECLARE_mInt64(row_column_page_size);
 // it must be larger than or equal to 5MB
 DECLARE_mInt32(s3_write_buffer_size);
-// the size of the whole s3 buffer pool, which indicates the s3 file writer
-// can at most buffer 50MB data. And the num of multi part upload task is
-// s3_write_buffer_whole_size / s3_write_buffer_size
-DECLARE_mInt32(s3_write_buffer_whole_size);
 // The timeout config for S3 buffer allocation
 DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
 // the max number of cached file handle for block segemnt
diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp 
b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
index 0ca9edc530a..9b7ec178d77 100644
--- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
+++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
@@ -119,8 +119,6 @@ int main(int argc, char** argv) {
                               .set_min_threads(num_cores)
                               .set_max_threads(num_cores)
                               .build(&s3_file_upload_thread_pool));
-    doris::io::S3FileBufferPool* s3_buffer_pool = 
doris::io::S3FileBufferPool::GetInstance();
-    s3_buffer_pool->init(524288000, 5242880, s3_file_upload_thread_pool.get());
 
     try {
         doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, 
std::stoi(FLAGS_threads),
diff --git a/be/src/io/fs/broker_file_system.cpp 
b/be/src/io/fs/broker_file_system.cpp
index 576a7dd8959..44582a0ff8d 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() {
 Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* 
writer,
                                           const FileWriterOptions* opts) {
     *writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(), 
_broker_addr, _broker_prop,
-                                                 path, 0 /* offset */, 
getSPtr());
+                                                 path, 0 /* offset */, 
getSPtr(), opts);
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/broker_file_writer.cpp 
b/be/src/io/fs/broker_file_writer.cpp
index d5b2baa7a66..75acf40084c 100644
--- a/be/src/io/fs/broker_file_writer.cpp
+++ b/be/src/io/fs/broker_file_writer.cpp
@@ -37,12 +37,15 @@ namespace io {
 
 BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& 
broker_address,
                                    const std::map<std::string, std::string>& 
properties,
-                                   const std::string& path, int64_t 
start_offset, FileSystemSPtr fs)
+                                   const std::string& path, int64_t 
start_offset, FileSystemSPtr fs,
+                                   const FileWriterOptions* opts)
         : FileWriter(path, fs),
           _env(env),
           _address(broker_address),
           _properties(properties),
-          _cur_offset(start_offset) {}
+          _cur_offset(start_offset) {
+    _create_empty_file = opts ? opts->create_empty_file : true;
+}
 
 BrokerFileWriter::~BrokerFileWriter() {
     if (_opened) {
@@ -153,6 +156,14 @@ Status BrokerFileWriter::finalize() {
     return Status::OK();
 }
 
+Status BrokerFileWriter::open() {
+    if (_create_empty_file && !_opened) {
+        RETURN_IF_ERROR(_open());
+        _opened = true;
+    }
+    return Status::OK();
+}
+
 Status BrokerFileWriter::_open() {
     TBrokerOpenWriterRequest request;
 
diff --git a/be/src/io/fs/broker_file_writer.h 
b/be/src/io/fs/broker_file_writer.h
index cf5b8013acb..05b62846e6c 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -42,9 +42,10 @@ class BrokerFileWriter : public FileWriter {
 public:
     BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
                      const std::map<std::string, std::string>& properties, 
const std::string& path,
-                     int64_t start_offset, FileSystemSPtr fs);
+                     int64_t start_offset, FileSystemSPtr fs, const 
FileWriterOptions* opts);
     virtual ~BrokerFileWriter();
 
+    Status open() override;
     Status close() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
     Status finalize() override;
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 58c9c9ff060..bb3235e7d27 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -35,6 +35,8 @@ struct FileWriterOptions {
     bool is_cold_data = false;
     bool sync_file_data = true;        // Whether flush data into storage 
system
     int64_t file_cache_expiration = 0; // Absolute time
+    // Whether to create empty file if no content
+    bool create_empty_file = true;
 };
 
 class FileWriter {
@@ -46,6 +48,9 @@ public:
 
     DISALLOW_COPY_AND_ASSIGN(FileWriter);
 
+    // Open the file for writing.
+    virtual Status open() { return Status::OK(); }
+
     // Normal close. Wait for all data to persist before returning.
     virtual Status close() = 0;
 
@@ -74,6 +79,7 @@ protected:
     std::shared_ptr<FileSystem> _fs;
     bool _closed = false;
     bool _opened = false;
+    bool _create_empty_file = true;
 };
 
 using FileWriterPtr = std::unique_ptr<FileWriter>;
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index 8ada4b92acc..a65784226f3 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -167,8 +167,8 @@ Status HdfsFileSystem::connect_impl() {
 }
 
 Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* 
writer,
-                                        const FileWriterOptions*) {
-    *writer = std::make_unique<HdfsFileWriter>(file, getSPtr());
+                                        const FileWriterOptions* opts) {
+    *writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts);
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index 00081db310f..40c3c59dcd7 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -34,7 +34,9 @@
 namespace doris {
 namespace io {
 
-HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : 
FileWriter(std::move(file), fs) {
+HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const 
FileWriterOptions* opts)
+        : FileWriter(std::move(file), fs) {
+    _create_empty_file = opts ? opts->create_empty_file : true;
     _hdfs_fs = (HdfsFileSystem*)_fs.get();
 }
 
@@ -103,6 +105,14 @@ Status HdfsFileWriter::finalize() {
     return Status::OK();
 }
 
+Status HdfsFileWriter::open() {
+    if (_create_empty_file && !_opened) {
+        RETURN_IF_ERROR(_open());
+        _opened = true;
+    }
+    return Status::OK();
+}
+
 Status HdfsFileWriter::_open() {
     _path = convert_path(_path, _hdfs_fs->_fs_name);
     std::string hdfs_dir = _path.parent_path().string();
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index c05f7625124..21dcaff1cd6 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -33,9 +33,10 @@ class HdfsFileSystem;
 
 class HdfsFileWriter : public FileWriter {
 public:
-    HdfsFileWriter(Path file, FileSystemSPtr fs);
+    HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* 
opts);
     ~HdfsFileWriter();
 
+    Status open() override;
     Status close() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
     Status finalize() override;
diff --git a/be/src/io/fs/s3_file_bufferpool.cpp 
b/be/src/io/fs/s3_file_bufferpool.cpp
index a4f75319ec0..4efbd439429 100644
--- a/be/src/io/fs/s3_file_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -17,50 +17,73 @@
 
 #include "s3_file_bufferpool.h"
 
+#include <chrono>
+#include <memory>
+
 #include "common/config.h"
+#include "common/exception.h"
 #include "common/logging.h"
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "io/cache/block/block_file_cache_fwd.h"
 #include "io/cache/block/block_file_segment.h"
 #include "io/fs/s3_common.h"
 #include "runtime/exec_env.h"
 #include "util/defer_op.h"
 #include "util/slice.h"
+#include "vec/common/arena.h"
 
 namespace doris {
 namespace io {
 
 bvar::Adder<uint64_t> s3_file_buffer_allocated("s3_file_buffer_allocated");
-bvar::Adder<uint64_t> s3_file_buffer_allocating("s3_file_buffer_allocating");
 
-/**
- * 0. check if the inner memory buffer is empty or not
- * 1. relcaim the memory buffer if it's mot empty
- */
-void FileBuffer::on_finish() {
-    if (_buffer.empty()) {
-        return;
+template <typename Allocator = Allocator<false>>
+struct Memory : boost::noncopyable, Allocator {
+    Memory() = default;
+    explicit Memory(size_t size) : _size(size) {
+        alloc(size);
+        s3_file_buffer_allocated << 1;
     }
-    S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(), 
_capacity});
-    _buffer.clear();
-}
+    ~Memory() {
+        dealloc();
+        s3_file_buffer_allocated << -1;
+    }
+    void alloc(size_t size) { _data = 
static_cast<char*>(Allocator::alloc(size, 0)); }
+    void dealloc() {
+        if (_data == nullptr) {
+            return;
+        }
+        Allocator::free(_data, _size);
+        _data = nullptr;
+    }
+    size_t _size;
+    char* _data;
+};
 
-/**
- * take other buffer's memory space and refresh capacity
- */
-void FileBuffer::swap_buffer(Slice& other) {
-    _buffer = other;
-    _capacity = _buffer.get_size();
-    other.clear();
+struct FileBuffer::PartData {
+    Memory<> _memory;
+    PartData() : _memory(config::s3_write_buffer_size) {}
+    ~PartData() = default;
+    [[nodiscard]] Slice data() const { return Slice {_memory._data, 
_memory._size}; }
+    [[nodiscard]] size_t size() const { return _memory._size; }
+};
+
+Slice FileBuffer::get_slice() const {
+    return _inner_data->data();
 }
 
-FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, 
size_t offset,
-                       OperationState state, bool reserve)
-        : _alloc_holder(std::move(alloc_holder)),
-          _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)),
+FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> 
alloc_holder,
+                       size_t offset, OperationState state)
+        : _type(type),
+          _alloc_holder(std::move(alloc_holder)),
           _offset(offset),
           _size(0),
           _state(std::move(state)),
-          _capacity(_buffer.get_size()) {}
+          _inner_data(std::make_unique<FileBuffer::PartData>()),
+          _capacity(_inner_data->size()) {}
 
+FileBuffer::~FileBuffer() = default;
 /**
  * 0. check if file cache holder allocated
  * 1. update the cache's type to index cache
@@ -86,136 +109,69 @@ void UploadFileBuffer::set_index_offset(size_t offset) {
  * 1. write to file cache otherwise, then we'll wait for free buffer and to 
rob it
  */
 Status UploadFileBuffer::append_data(const Slice& data) {
-    Defer defer {[&] { _size += data.get_size(); }};
-    while (true) {
-        // if buf is not empty, it means there is memory preserved for this buf
-        if (!_buffer.empty()) {
-            std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(), 
data.get_size());
-            break;
-        }
-        // if the buf has no memory reserved, then write to disk first
-        if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder 
!= nullptr) {
-            _holder = _alloc_holder();
-            bool cache_is_not_enough = false;
-            for (auto& segment : _holder->file_segments) {
-                DCHECK(segment->state() == FileBlock::State::SKIP_CACHE ||
-                       segment->state() == FileBlock::State::EMPTY);
-                if (segment->state() == FileBlock::State::SKIP_CACHE) 
[[unlikely]] {
-                    cache_is_not_enough = true;
-                    break;
-                }
-                if (_index_offset != 0) {
-                    
RETURN_IF_ERROR(segment->change_cache_type_self(CacheType::INDEX));
-                }
-            }
-            // if cache_is_not_enough, cannot use it !
-            _cur_file_segment = _holder->file_segments.begin();
-            _append_offset = (*_cur_file_segment)->range().left;
-            _holder = cache_is_not_enough ? nullptr : std::move(_holder);
-            if (_holder) {
-                (*_cur_file_segment)->get_or_set_downloader();
-            }
-            _is_cache_allocated = true;
-        }
-        if (_holder) [[likely]] {
-            size_t data_remain_size = data.get_size();
-            size_t pos = 0;
-            while (data_remain_size != 0) {
-                auto range = (*_cur_file_segment)->range();
-                size_t segment_remain_size = range.right - _append_offset + 1;
-                size_t append_size = std::min(data_remain_size, 
segment_remain_size);
-                Slice append_data(data.get_data() + pos, append_size);
-                // When there is no available free memory buffer, the data 
will be written to the cache first
-                // and then uploaded to S3 when there is an available free 
memory buffer.
-                // However, if an error occurs during the write process to the 
local cache,
-                // continuing to upload the dirty data from the cache to S3 
will result in erroneous data(Bad segment).
-                // Considering that local disk write failures are rare, a 
simple approach is chosen here,
-                // which is to treat the import as a failure directly when a 
local write failure occurs
-                RETURN_IF_ERROR((*_cur_file_segment)->append(append_data));
-                if (segment_remain_size == append_size) {
-                    RETURN_IF_ERROR((*_cur_file_segment)->finalize_write());
-                    if (++_cur_file_segment != _holder->file_segments.end()) {
-                        (*_cur_file_segment)->get_or_set_downloader();
-                    }
-                }
-                data_remain_size -= append_size;
-                _append_offset += append_size;
-                pos += append_size;
-            }
-            break;
-        } else {
-            // wait allocate buffer pool
-            auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
-            if (tmp.empty()) [[unlikely]] {
-                return Status::InternalError("Failed to allocate S3 buffer for 
{} seconds",
-                                             
config::s3_writer_buffer_allocation_timeout);
-            }
-            swap_buffer(tmp);
-        }
-    }
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::append_data", 
Status::OK());
+    std::memcpy((void*)(_inner_data->data().get_data() + _size), 
data.get_data(), data.get_size());
+    _size += data.get_size();
+    _crc_value = crc32c::Extend(_crc_value, data.get_data(), data.get_size());
     return Status::OK();
 }
 
 /**
- * 0. allocate one memory buffer
- * 1. read the content from the cache and then write
- * it into memory buffer
+ * 0. constrcut the stream ptr if the buffer is not empty
+ * 1. submit the on_upload() callback to executor
  */
-void UploadFileBuffer::read_from_cache() {
-    auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
-    if (tmp.empty()) [[unlikely]] {
-        set_val(Status::InternalError("Failed to allocate S3 buffer for {} 
seconds",
-                                      
config::s3_writer_buffer_allocation_timeout));
-        return;
-    }
-    swap_buffer(tmp);
+static Status submit_upload_buffer(std::shared_ptr<FileBuffer> buffer) {
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::submit", 
Status::OK(), buffer.get());
+    return ExecEnv::GetInstance()->s3_file_upload_thread_pool()->submit_func(
+            [buf = std::move(buffer)]() { buf->execute_async(); });
+}
 
-    DCHECK(_holder != nullptr);
-    DCHECK(_capacity >= _size);
-    size_t pos = 0;
-    for (auto& segment : _holder->file_segments) {
-        if (pos == _size) {
-            break;
-        }
-        if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] {
-            set_val(std::move(s));
-            return;
-        }
-        size_t segment_size = segment->range().size();
-        Slice s(_buffer.get_data() + pos, segment_size);
-        if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] {
-            set_val(std::move(st));
-            return;
-        }
-        pos += segment_size;
+std::ostream& operator<<(std::ostream& os, const BufferType& value) {
+    switch (value) {
+    case BufferType::UPLOAD:
+        os << "upload";
+        break;
+    case BufferType::DOWNLOAD:
+        os << "download";
+        break;
+    default:
+        auto cast_value = static_cast<uint32_t>(value);
+        os << cast_value;
     }
+    return os;
+}
 
-    // the real lenght should be the buf.get_size() in this situation(consider 
it's the last part,
-    // size of it could be less than 5MB)
-    _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+Status FileBuffer::submit(std::shared_ptr<FileBuffer> buf) {
+    switch (buf->_type) {
+    case BufferType::UPLOAD:
+        return submit_upload_buffer(std::move(buf));
+        break;
+    default:
+        CHECK(false) << "should never come here, the illegal type is " << 
buf->_type;
+    };
+    return Status::InternalError("should never come here");
 }
 
-/**
- * 0. constrcut the stream ptr if the buffer is not empty
- * 1. submit the on_upload() callback to executor
- */
-void UploadFileBuffer::submit() {
-    if (!_buffer.empty()) [[likely]] {
-        _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+void UploadFileBuffer::on_upload() {
+    _stream_ptr = 
std::make_shared<StringViewStream>(_inner_data->data().get_data(), _size);
+    if (_crc_value != crc32c::Value(_inner_data->data().get_data(), _size)) {
+        DCHECK(false);
+        set_status(Status::IOError("Buffer checksum not match"));
+        return;
     }
-    // If the data is written into file cache
-    if (_holder && _cur_file_segment != _holder->file_segments.end()) {
-        if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok()) 
[[unlikely]] {
-            set_val(std::move(s));
-            return;
-        }
+    _upload_to_remote(*this);
+    if (config::enable_flush_file_cache_async) {
+        // If we call is_cancelled() after _state.set_status() then there 
might one situation where
+        // s3 file writer is already destructed
+        bool cancelled = is_cancelled();
+        _state.set_status();
+        // this control flow means the buf and the stream shares one memory
+        // so we can directly use buf here
+        upload_to_local_file_cache(cancelled);
+    } else {
+        upload_to_local_file_cache(is_cancelled());
+        _state.set_status();
     }
-    
static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func(
-            [buf = this->shared_from_this(), this]() {
-                // to extend buf's lifetime
-                // (void)buf;
-                on_upload();
-            }));
 }
 
 /**
@@ -231,6 +187,7 @@ void UploadFileBuffer::upload_to_local_file_cache(bool 
is_cancelled) {
     if (is_cancelled) {
         return;
     }
+    
TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache");
     // the data is already written to S3 in this situation
     // so i didn't handle the file cache write error
     _holder = _alloc_holder();
@@ -244,20 +201,26 @@ void UploadFileBuffer::upload_to_local_file_cache(bool 
is_cancelled) {
         size_t append_size = std::min(data_remain_size, segment_size);
         if (segment->state() == FileBlock::State::EMPTY) {
             if (_index_offset != 0 && segment->range().right >= _index_offset) 
{
-                // segment->change_cache_type_self(CacheType::INDEX);
+                
static_cast<void>(segment->change_cache_type_self(CacheType::INDEX));
             }
             segment->get_or_set_downloader();
             // Another thread may have started downloading due to a query
             // Just skip putting to cache from UploadFileBuffer
             if (segment->is_downloader()) {
-                Slice s(_buffer.get_data() + pos, append_size);
-                if (auto st = segment->append(s); !st.ok()) [[unlikely]] {
-                    LOG_WARNING("append data to cache segmetn failed due to 
{}", st);
-                    return;
+                Slice s(_inner_data->data().get_data() + pos, append_size);
+                Status st = segment->append(s);
+                
TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache_inject",
+                                              &st);
+                if (st.ok()) {
+                    st = segment->finalize_write();
                 }
-                if (auto st = segment->finalize_write(); !st.ok()) 
[[unlikely]] {
-                    LOG_WARNING("finalize write to cache segmetn failed due to 
{}", st);
-                    return;
+                if (!st.ok()) {
+                    {
+                        [[maybe_unused]] bool ret = false;
+                        
TEST_SYNC_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache",
+                                                 &ret);
+                    }
+                    LOG_WARNING("failed to append data to file 
cache").error(st);
                 }
             }
         }
@@ -287,82 +250,17 @@ FileBufferBuilder& 
FileBufferBuilder::set_allocate_file_segments_holder(
     return *this;
 }
 
-std::shared_ptr<FileBuffer> FileBufferBuilder::build() {
+Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
     OperationState state(_sync_after_complete_task, _is_cancelled);
+
     if (_type == BufferType::UPLOAD) {
-        return std::make_shared<UploadFileBuffer>(std::move(_upload_cb), 
std::move(state), _offset,
-                                                  std::move(_alloc_holder_cb), 
_index_offset);
+        RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
+                                          std::move(_upload_cb), 
std::move(state), _offset,
+                                          std::move(_alloc_holder_cb), 
_index_offset));
+        return Status::OK();
     }
     // should never come here
-    return nullptr;
-}
-
-void S3FileBufferPool::reclaim(Slice buf) {
-    {
-        std::unique_lock<std::mutex> lck {_lock};
-        _free_raw_buffers.emplace_back(buf);
-        // only works when not set file cache
-        _cv.notify_all();
-    }
-    s3_file_buffer_allocated << -1;
-}
-
-void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t 
s3_write_buffer_size,
-                            ThreadPool* thread_pool) {
-    // the nums could be one configuration
-    size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
-    DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
-           (s3_write_buffer_whole_size > s3_write_buffer_size))
-            << "s3 write buffer size " << s3_write_buffer_size << " whole s3 
write buffer size "
-            << s3_write_buffer_whole_size;
-    LOG_INFO("S3 file buffer pool with {} buffers, each with {}", buf_num, 
s3_write_buffer_size);
-    _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
-    for (size_t i = 0; i < buf_num; i++) {
-        Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
-                 static_cast<size_t>(s3_write_buffer_size)};
-        _free_raw_buffers.emplace_back(s);
-    }
-    _thread_pool = thread_pool;
-}
-
-Slice S3FileBufferPool::allocate(bool reserve) {
-    Slice buf;
-    Defer defer {[&]() {
-        if (!buf.empty()) {
-            s3_file_buffer_allocated << 1;
-        }
-        s3_file_buffer_allocating << -1;
-    }};
-    s3_file_buffer_allocating << 1;
-    // if need reserve or no cache then we must ensure return buf with memory 
preserved
-    if (reserve || !config::enable_file_cache) {
-        {
-            std::unique_lock<std::mutex> lck {_lock};
-            _cv.wait_for(lck, 
std::chrono::seconds(config::s3_writer_buffer_allocation_timeout),
-                         [this]() { return !_free_raw_buffers.empty(); });
-            if (!_free_raw_buffers.empty()) {
-                buf = _free_raw_buffers.front();
-                _free_raw_buffers.pop_front();
-            }
-        }
-        return buf;
-    }
-    // try to get one memory reserved buffer
-    {
-        std::unique_lock<std::mutex> lck {_lock};
-        if (!_free_raw_buffers.empty()) {
-            buf = _free_raw_buffers.front();
-            _free_raw_buffers.pop_front();
-        }
-    }
-    if (!buf.empty()) {
-        return buf;
-    }
-    // if there is no free buffer and no need to reserve memory, we could 
return one empty buffer
-    buf = Slice();
-    // if the buf has no memory reserved, it would try to write the data to 
file cache first
-    // or it would try to rob buffer from other S3FileBuffer
-    return buf;
+    return Status::InternalError("unsupport buffer type {}", _type);
 }
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/s3_file_bufferpool.h 
b/be/src/io/fs/s3_file_bufferpool.h
index 01d31928748..c1bdf08f7ae 100644
--- a/be/src/io/fs/s3_file_bufferpool.h
+++ b/be/src/io/fs/s3_file_bufferpool.h
@@ -27,13 +27,13 @@
 
 #include "common/status.h"
 #include "io/cache/block/block_file_segment.h"
-#include "runtime/exec_env.h"
+#include "util/crc32c.h"
 #include "util/slice.h"
 #include "util/threadpool.h"
 
 namespace doris {
 namespace io {
-enum class BufferType { DOWNLOAD, UPLOAD };
+enum class BufferType : uint32_t { DOWNLOAD, UPLOAD };
 using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
 struct OperationState {
     OperationState(std::function<bool(Status)> sync_after_complete_task,
@@ -45,13 +45,13 @@ struct OperationState {
     *
     * @param S the execution result
     */
-    void set_val(Status s = Status::OK()) {
+    void set_status(Status s = Status::OK()) {
         // make sure we wouldn't sync twice
         if (_value_set) [[unlikely]] {
             return;
         }
         if (nullptr != _sync_after_complete_task) {
-            _fail_after_sync = _sync_after_complete_task(s);
+            _fail_after_sync = _sync_after_complete_task(std::move(s));
         }
         _value_set = true;
     }
@@ -75,36 +75,27 @@ struct OperationState {
     bool _fail_after_sync = false;
 };
 
-struct FileBuffer : public std::enable_shared_from_this<FileBuffer> {
-    FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t 
offset,
-               OperationState state, bool reserve = false);
-    virtual ~FileBuffer() { on_finish(); }
+struct FileBuffer {
+    FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> 
alloc_holder, size_t offset,
+               OperationState state);
+    virtual ~FileBuffer();
     /**
     * submit the correspoding task to async executor
     */
-    virtual void submit() = 0;
+    static Status submit(std::shared_ptr<FileBuffer> buf);
     /**
     * append data to the inner memory buffer
     *
     * @param S the content to be appended
     */
     virtual Status append_data(const Slice& s) = 0;
-    /**
-    * call the reclaim callback when task is done 
-    */
-    void on_finish();
-    /**
-    * swap memory buffer
-    *
-    * @param other which has memory buffer allocated
-    */
-    void swap_buffer(Slice& other);
+    virtual void execute_async() = 0;
     /**
     * set the val of it's operation state
     *
     * @param S the execution result
     */
-    void set_val(Status s) { _state.set_val(s); }
+    void set_status(Status s) { _state.set_status(s); }
     /**
     * get the start offset of this file buffer
     *
@@ -117,6 +108,8 @@ struct FileBuffer : public 
std::enable_shared_from_this<FileBuffer> {
     * @return the size of the buffered data
     */
     size_t get_size() const { return _size; }
+    size_t get_capacaticy() const { return _capacity; }
+    Slice get_slice() const;
     /**
     * detect whether the execution task is done
     *
@@ -124,11 +117,13 @@ struct FileBuffer : public 
std::enable_shared_from_this<FileBuffer> {
     */
     bool is_cancelled() const { return _state.is_cancelled(); }
 
+    BufferType _type;
     std::function<FileBlocksHolderPtr()> _alloc_holder;
-    Slice _buffer;
     size_t _offset;
     size_t _size;
     OperationState _state;
+    struct PartData;
+    std::unique_ptr<PartData> _inner_data;
     size_t _capacity;
 };
 
@@ -136,11 +131,10 @@ struct UploadFileBuffer final : public FileBuffer {
     UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, 
OperationState state,
                      size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder,
                      size_t index_offset)
-            : FileBuffer(alloc_holder, offset, state),
+            : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
               _upload_to_remote(std::move(upload_cb)),
               _index_offset(index_offset) {}
     ~UploadFileBuffer() override = default;
-    void submit() override;
     /**
     * set the index offset
     *
@@ -158,6 +152,8 @@ struct UploadFileBuffer final : public FileBuffer {
     * local file cache
     */
     void upload_to_local_file_cache(bool);
+
+    void execute_async() override { on_upload(); }
     /**
     * do the upload work
     * 1. read from cache if the data is written to cache first
@@ -166,25 +162,7 @@ struct UploadFileBuffer final : public FileBuffer {
     * 4. call the finish callback caller specified
     * 5. reclaim self
     */
-    void on_upload() {
-        if (_buffer.empty()) {
-            read_from_cache();
-        }
-        _upload_to_remote(*this);
-        if (config::enable_flush_file_cache_async) {
-            // If we call is_cancelled() after _state.set_val() then there 
might one situation where
-            // s3 file writer is already destructed
-            bool cancelled = is_cancelled();
-            _state.set_val();
-            // this control flow means the buf and the stream shares one memory
-            // so we can directly use buf here
-            upload_to_local_file_cache(cancelled);
-        } else {
-            upload_to_local_file_cache(is_cancelled());
-            _state.set_val();
-        }
-        on_finish();
-    }
+    void on_upload();
     /**
     *
     * @return the stream representing the inner memory buffer
@@ -207,6 +185,7 @@ private:
     decltype(_holder->file_segments.begin()) _cur_file_segment;
     size_t _append_offset {0};
     size_t _index_offset {0};
+    uint32_t _crc_value = 0;
 };
 
 struct FileBufferBuilder {
@@ -216,7 +195,7 @@ struct FileBufferBuilder {
     * build one file buffer using previously set properties
     * @return the file buffer's base shared pointer
     */
-    std::shared_ptr<FileBuffer> build();
+    Status build(std::shared_ptr<FileBuffer>* buf);
     /**
     * set the file buffer type
     *
@@ -310,43 +289,5 @@ struct FileBufferBuilder {
     size_t _offset;
     size_t _index_offset;
 };
-
-class S3FileBufferPool {
-public:
-    S3FileBufferPool() = default;
-    ~S3FileBufferPool() = default;
-
-    // should be called one and only once
-    // at startup
-    void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
-              doris::ThreadPool* thread_pool);
-
-    /**
-    *
-    * @return singleton of the S3FileBufferPool
-    */
-    static S3FileBufferPool* GetInstance() {
-        return ExecEnv::GetInstance()->get_s3_file_buffer_pool();
-    }
-
-    void reclaim(Slice buf);
-
-    /**
-    *
-    * @param reserve must return buffer with memory allocated
-    * @return memory buffer
-    */
-    Slice allocate(bool reserve = false);
-
-    ThreadPool* thread_pool() { return _thread_pool; }
-
-private:
-    std::mutex _lock;
-    std::condition_variable _cv;
-    std::unique_ptr<char[]> _whole_mem_buffer;
-    std::list<Slice> _free_raw_buffers;
-    // not owned
-    ThreadPool* _thread_pool = nullptr;
-};
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index dd93bbebe32..dbe5ce8e70c 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -98,6 +98,8 @@ S3FileWriter::S3FileWriter(std::string key, 
std::shared_ptr<S3FileSystem> fs,
         _cache_key = IFileCache::hash(_path.filename().native());
         _cache = FileCacheFactory::instance()->get_by_path(_cache_key);
     }
+
+    _create_empty_file = opts ? opts->create_empty_file : true;
 }
 
 S3FileWriter::~S3FileWriter() {
@@ -161,7 +163,6 @@ Status S3FileWriter::_abort() {
     }
     // we need to reclaim the memory
     if (_pending_buf) {
-        _pending_buf->on_finish();
         _pending_buf = nullptr;
     }
     LOG(INFO) << "S3FileWriter::abort, path: " << _path.native();
@@ -199,17 +200,46 @@ Status S3FileWriter::close() {
         return _st;
     }
     VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
-    // it might be one file less than 5MB, we do upload here
-    if (_pending_buf != nullptr) {
-        if (_upload_id.empty()) {
-            auto buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+
+    if (_upload_id.empty()) {
+        if (_pending_buf != nullptr) {
+            // it might be one file less than 5MB, we do upload here
+            auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
             DCHECK(buf != nullptr);
             buf->set_upload_to_remote([this](UploadFileBuffer& b) { 
_put_object(b); });
+        } else if (_create_empty_file) {
+            // if there is no pending buffer, we need to create an empty file
+            auto builder = FileBufferBuilder();
+            builder.set_type(BufferType::UPLOAD)
+                    .set_upload_callback([this](UploadFileBuffer& buf) { 
_put_object(buf); })
+                    .set_sync_after_complete_task([this](Status s) {
+                        bool ret = false;
+                        if (!s.ok()) [[unlikely]] {
+                            VLOG_NOTICE << "failed at key: " << _key
+                                        << ", status: " << s.to_string();
+                            std::unique_lock<std::mutex> _lck 
{_completed_lock};
+                            _failed = true;
+                            ret = true;
+                            this->_st = std::move(s);
+                        }
+                        // After the signal, there is a scenario where the 
previous invocation of _wait_until_finish
+                        // returns to the caller, and subsequently, the S3 
file writer is destructed.
+                        // This means that accessing _failed afterwards would 
result in a heap use after free vulnerability.
+                        _countdown_event.signal();
+                        return ret;
+                    })
+                    .set_is_cancelled([this]() { return _failed.load(); });
+            RETURN_IF_ERROR(builder.build(&_pending_buf));
+            auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+            DCHECK(buf != nullptr);
         }
+    }
+    if (_pending_buf != nullptr) {
         _countdown_event.add_count();
-        _pending_buf->submit();
+        RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
         _pending_buf = nullptr;
     }
+
     DBUG_EXECUTE_IF("s3_file_writer::close", {
         RETURN_IF_ERROR(_complete());
         return Status::InternalError("failed to close s3 file writer");
@@ -273,7 +303,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                                 return 
std::make_unique<FileBlocksHolder>(std::move(holder));
                             });
                 }
-                _pending_buf = builder.build();
+                RETURN_IF_ERROR(builder.build(&_pending_buf));
             }
             // we need to make sure all parts except the last one to be 5MB or 
more
             // and shouldn't be larger than buf
@@ -296,7 +326,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                 }
                 _cur_part_num++;
                 _countdown_event.add_count();
-                _pending_buf->submit();
+                RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
                 _pending_buf = nullptr;
             }
             _bytes_appended += data_size_to_append;
@@ -334,16 +364,20 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
UploadFileBuffer& buf) {
                     "injected error",
                     _bucket, _path.native(), part_num, _upload_id);
             LOG_WARNING(s.to_string());
-            buf.set_val(s);
+            buf.set_status(std::move(s));
             return;
         }
     });
     if (!upload_part_outcome.IsSuccess()) {
-        _st = s3fs_error(upload_part_outcome.GetError(),
-                         fmt::format("failed to upload part {}, part_num={}, 
upload_id={}",
-                                     _path.native(), part_num, _upload_id));
-        LOG(WARNING) << _st;
-        buf.set_val(_st);
+        auto s = Status::IOError(
+                "failed to upload part (bucket={}, key={}, part_num={}, 
up_load_id={}): {}, "
+                "exception {}, error code {}",
+                _bucket, _path.native(), part_num, _upload_id,
+                upload_part_outcome.GetError().GetMessage(),
+                upload_part_outcome.GetError().GetExceptionName(),
+                upload_part_outcome.GetError().GetResponseCode());
+        LOG_WARNING(s.to_string());
+        buf.set_status(std::move(s));
         return;
     }
     s3_bytes_written_total << buf.get_size();
@@ -431,12 +465,12 @@ Status S3FileWriter::finalize() {
         // if we only need to upload one file less than 5MB, we can just
         // call PutObject to reduce the network IO
         if (_upload_id.empty()) {
-            auto buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+            auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
             DCHECK(buf != nullptr);
             buf->set_upload_to_remote([this](UploadFileBuffer& b) { 
_put_object(b); });
         }
         _countdown_event.add_count();
-        _pending_buf->submit();
+        RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
         _pending_buf = nullptr;
     }
     _wait_until_finish("finalize");
@@ -454,7 +488,7 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
     request.SetContentType("application/octet-stream");
     DBUG_EXECUTE_IF("s3_file_writer::_put_object", {
         _st = Status::InternalError("failed to put object");
-        buf.set_val(_st);
+        buf.set_status(_st);
         LOG(WARNING) << _st;
         return;
     });
@@ -464,7 +498,7 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
         _st = s3fs_error(response.GetError(), fmt::format("failed to put 
object {}, upload_id={}",
                                                           _path.native(), 
_upload_id));
         LOG(WARNING) << _st;
-        buf.set_val(_st);
+        buf.set_status(_st);
         return;
     }
     _bytes_written += buf.get_size();
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 991ad4a831b..278b02db4d9 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -707,7 +707,7 @@ Status 
BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWrite
                     _context.file_cache_ttl_sec > 0 && 
_context.newest_write_timestamp > 0
                             ? _context.newest_write_timestamp + 
_context.file_cache_ttl_sec
                             : 0,
-    };
+            .create_empty_file = false};
     Status st = fs->create_file(path, &file_writer, &opts);
     if (!st.ok()) {
         LOG(WARNING) << "failed to create writable file. path=" << path << ", 
err: " << st;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0f36278d120..1f6b1b7e8b9 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -52,7 +52,6 @@ namespace taskgroup {
 class TaskGroupManager;
 }
 namespace io {
-class S3FileBufferPool;
 class FileCacheFactory;
 } // namespace io
 namespace segment_v2 {
@@ -251,7 +250,6 @@ public:
 
     TabletSchemaCache* get_tablet_schema_cache() { return 
_tablet_schema_cache; }
     StorageEngine* get_storage_engine() { return _storage_engine; }
-    io::S3FileBufferPool* get_s3_file_buffer_pool() { return _s3_buffer_pool; }
     SchemaCache* schema_cache() { return _schema_cache; }
     StoragePageCache* get_storage_page_cache() { return _storage_page_cache; }
     SegmentLoader* segment_loader() { return _segment_loader; }
@@ -364,7 +362,6 @@ private:
     // these redundancy header could introduce potential bug, at least, more 
header means slow compile.
     // So we choose to use raw pointer, please remember to delete these 
pointer in deconstructor.
     TabletSchemaCache* _tablet_schema_cache = nullptr;
-    io::S3FileBufferPool* _s3_buffer_pool = nullptr;
     StorageEngine* _storage_engine = nullptr;
     SchemaCache* _schema_cache = nullptr;
     StoragePageCache* _storage_page_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index c7278af9be2..f5f08a71064 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -253,11 +253,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _tablet_schema_cache =
             
TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);
 
-    // S3 buffer pool
-    _s3_buffer_pool = new io::S3FileBufferPool();
-    _s3_buffer_pool->init(config::s3_write_buffer_whole_size, 
config::s3_write_buffer_size,
-                          this->s3_file_upload_thread_pool());
-
     // Storage engine
     doris::EngineOptions options;
     options.store_paths = store_paths;
@@ -552,7 +547,6 @@ void ExecEnv::destroy() {
     SAFE_SHUTDOWN(_lazy_release_obj_pool);
     SAFE_SHUTDOWN(_send_report_thread_pool);
     SAFE_SHUTDOWN(_send_batch_thread_pool);
-    SAFE_DELETE(_s3_buffer_pool);
     _deregister_metrics();
     SAFE_DELETE(_load_channel_mgr);
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index bf3d09cdda6..d3228d33680 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -141,6 +141,7 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
             FileFactory::convert_storage_type(_storage_type), 
_state->exec_env(),
             _file_opts->broker_addresses, _file_opts->broker_properties, 
file_name, 0,
             _file_writer_impl));
+    RETURN_IF_ERROR(_file_writer_impl->open());
     switch (_file_opts->file_format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
         _vfile_writer.reset(new VCSVTransformer(_state, 
_file_writer_impl.get(),
diff --git a/be/test/io/fs/s3_file_writer_test.cpp 
b/be/test/io/fs/s3_file_writer_test.cpp
index b37d358f657..5ff1cc5e48a 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -30,7 +30,6 @@
 #include "io/fs/file_reader.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
-#include "io/fs/s3_file_bufferpool.h"
 #include "io/fs/s3_file_system.h"
 #include "io/io_common.h"
 #include "runtime/exec_env.h"
@@ -69,10 +68,6 @@ public:
                                   .build(&_s3_file_upload_thread_pool));
         ExecEnv::GetInstance()->_s3_file_upload_thread_pool =
                 std::move(_s3_file_upload_thread_pool);
-        ExecEnv::GetInstance()->_s3_buffer_pool = new io::S3FileBufferPool();
-        io::S3FileBufferPool::GetInstance()->init(
-                config::s3_write_buffer_whole_size, 
config::s3_write_buffer_size,
-                ExecEnv::GetInstance()->_s3_file_upload_thread_pool.get());
     }
 
     static void TearDownTestSuite() {
@@ -81,8 +76,6 @@ public:
         }
         ExecEnv::GetInstance()->_s3_file_upload_thread_pool->shutdown();
         ExecEnv::GetInstance()->_s3_file_upload_thread_pool = nullptr;
-        delete ExecEnv::GetInstance()->_s3_buffer_pool;
-        ExecEnv::GetInstance()->_s3_buffer_pool = nullptr;
     }
 
     void SetUp() override {
diff --git 
a/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out 
b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
new file mode 100644
index 00000000000..260c177d310
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_base1 --
+
+-- !select_tvf1 --
+
+-- !select_tvf2 --
+
+-- !select_tvf3 --
+
diff --git 
a/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy 
b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
new file mode 100644
index 00000000000..1804fff2a11
--- /dev/null
+++ 
b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_outfile_empty_data", "external,hive,tvf,external_docker") {
+
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("diable Hive test.")
+        return;
+    }
+
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    // use to outfile to hdfs
+    String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    // It's okay to use random `hdfsUser`, but can not be empty.
+    def hdfsUserName = "doris"
+    def format = "csv"
+    def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
+
+    // use to outfile to s3
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    // broker
+    String broker_name = "hdfs"
+
+    def export_table_name = "outfile_empty_data_test"
+
+    def create_table = {table_name, column_define ->
+        sql """ DROP TABLE IF EXISTS ${table_name} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${table_name} (
+            ${column_define}
+            )
+            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+        """
+    }
+
+    def outfile_to_HDFS_directly = {
+        // select ... into outfile ...
+        def uuid = UUID.randomUUID().toString()
+
+        hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
+        uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"
+
+        def res = sql """
+            SELECT * FROM ${export_table_name} t ORDER BY user_id
+            INTO OUTFILE "${uri}"
+            FORMAT AS ${format}
+            PROPERTIES (
+                "fs.defaultFS"="${defaultFS}",
+                "hadoop.username" = "${hdfsUserName}"
+            );
+        """
+        logger.info("outfile to hdfs direct success path: " + res[0][3]);
+        return res[0][3]
+    }
+
+    def outfile_to_HDFS_with_broker = {
+        // select ... into outfile ...
+        def uuid = UUID.randomUUID().toString()
+
+        hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
+        uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"
+
+        def res = sql """
+            SELECT * FROM ${export_table_name} t ORDER BY user_id
+            INTO OUTFILE "${uri}"
+            FORMAT AS ${format}
+            PROPERTIES (
+                "broker.fs.defaultFS"="${defaultFS}",
+                "broker.name"="hdfs",
+                "broker.username" = "${hdfsUserName}"
+            );
+        """
+        logger.info("outfile to hdfs with broker success path: " + res[0][3]);
+        return res[0][3]
+    }
+
+    def outfile_to_S3_directly = {
+        // select ... into outfile ...
+        s3_outfile_path = "${bucket}/outfile/csv/test-outfile-empty/"
+        uri = "s3://${s3_outfile_path}/exp_"
+
+        def res = sql """
+            SELECT * FROM ${export_table_name} t ORDER BY user_id
+            INTO OUTFILE "${uri}"
+            FORMAT AS csv
+            PROPERTIES (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}"
+            );
+        """
+        logger.info("outfile to s3 success path: " + res[0][3]);
+        return res[0][3]
+    }
+
+    try {
+        def doris_column_define = """
+                                    `user_id` INT NOT NULL COMMENT "用户id",
+                                    `name` STRING NULL,
+                                    `age` INT NULL"""
+        // create table
+        create_table(export_table_name, doris_column_define);
+        // test outfile empty data to hdfs directly
+        def outfile_to_hdfs_directly_url = outfile_to_HDFS_directly()
+        // test outfile empty data to hdfs with broker
+        def outfile_to_hdfs_with_broker_url= outfile_to_HDFS_with_broker()
+        // test outfile empty data to s3 directly
+        def outfile_to_s3_directly_url = outfile_to_S3_directly()
+        qt_select_base1 """ SELECT * FROM ${export_table_name} ORDER BY 
user_id; """ 
+
+        qt_select_tvf1 """ select * from HDFS(
+                    "uri" = "${outfile_to_hdfs_directly_url}0.csv",
+                    "hadoop.username" = "${hdfsUserName}",
+                    "format" = "${format}");
+                    """
+
+        qt_select_tvf2 """ select * from HDFS(
+                    "uri" = "${outfile_to_hdfs_with_broker_url}0.csv",
+                    "hadoop.username" = "${hdfsUserName}",
+                    "format" = "${format}");
+                    """
+        
+        qt_select_tvf3 """ SELECT * FROM S3 (
+                "uri" = 
"http://${s3_endpoint}${outfile_to_s3_directly_url.substring(4, 
outfile_to_s3_directly_url.length())}0.csv",
+                "ACCESS_KEY"= "${ak}",
+                "SECRET_KEY" = "${sk}",
+                "format" = "${format}",
+                "region" = "${region}",
+                "use_path_style" = "true"
+            );
+            """
+
+    } finally {
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to