This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a8d8c6a2711 [fix](file-writer) opt s3 file writer and fix empty file
related issue #28983 #30703 #31169 (#31213)
a8d8c6a2711 is described below
commit a8d8c6a2711ac18951ee7d8384de9588c62902a7
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Feb 21 16:48:54 2024 +0800
[fix](file-writer) opt s3 file writer and fix empty file related issue
#28983 #30703 #31169 (#31213)
* (feature)(cloud) Use dynamic allocator instead of static buffer pool for
better elasticity. (#28983)
* [fix](outfile) Fix unable to export empty data (#30703)
Issue Number: close #30600
Fix unable to export empty data to hdfs / S3, this behavior is inconsistent
with version 1.2.7,
version 1.2.7 can export empty data to hdfs/ S3, and there will be exported
files on S3/HDFS.
* [fix](file-writer) avoid empty file for segment writer (#31169)
---------
Co-authored-by: AlexYue <[email protected]>
Co-authored-by: zxealous <[email protected]>
---
be/src/common/config.cpp | 4 -
be/src/common/config.h | 4 -
be/src/io/fs/benchmark/fs_benchmark_tool.cpp | 2 -
be/src/io/fs/broker_file_system.cpp | 2 +-
be/src/io/fs/broker_file_writer.cpp | 15 +-
be/src/io/fs/broker_file_writer.h | 3 +-
be/src/io/fs/file_writer.h | 6 +
be/src/io/fs/hdfs_file_system.cpp | 4 +-
be/src/io/fs/hdfs_file_writer.cpp | 12 +-
be/src/io/fs/hdfs_file_writer.h | 3 +-
be/src/io/fs/s3_file_bufferpool.cpp | 340 ++++++++-------------
be/src/io/fs/s3_file_bufferpool.h | 103 ++-----
be/src/io/fs/s3_file_writer.cpp | 70 +++--
be/src/olap/rowset/beta_rowset_writer.cpp | 2 +-
be/src/runtime/exec_env.h | 3 -
be/src/runtime/exec_env_init.cpp | 6 -
be/src/vec/sink/writer/vfile_result_writer.cpp | 1 +
be/test/io/fs/s3_file_writer_test.cpp | 7 -
.../outfile/csv/test_outfile_empty_data.out | 9 +
.../outfile/csv/test_outfile_empty_data.groovy | 166 ++++++++++
20 files changed, 407 insertions(+), 355 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ad8f2aece43..709526dbf42 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1040,10 +1040,6 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt32(s3_write_buffer_size, "5242880");
-// the size of the whole s3 buffer pool, which indicates the s3 file writer
-// can at most buffer 50MB data. And the num of multi part upload task is
-// s3_write_buffer_whole_size / s3_write_buffer_size
-DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
// The timeout config for S3 buffer allocation
DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e75d6d472a8..e0e5c52e30c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1077,10 +1077,6 @@ DECLARE_mInt32(tablet_path_check_batch_size);
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt32(s3_write_buffer_size);
-// the size of the whole s3 buffer pool, which indicates the s3 file writer
-// can at most buffer 50MB data. And the num of multi part upload task is
-// s3_write_buffer_whole_size / s3_write_buffer_size
-DECLARE_mInt32(s3_write_buffer_whole_size);
// The timeout config for S3 buffer allocation
DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
// the max number of cached file handle for block segemnt
diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
index 0ca9edc530a..9b7ec178d77 100644
--- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
+++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
@@ -119,8 +119,6 @@ int main(int argc, char** argv) {
.set_min_threads(num_cores)
.set_max_threads(num_cores)
.build(&s3_file_upload_thread_pool));
- doris::io::S3FileBufferPool* s3_buffer_pool =
doris::io::S3FileBufferPool::GetInstance();
- s3_buffer_pool->init(524288000, 5242880, s3_file_upload_thread_pool.get());
try {
doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation,
std::stoi(FLAGS_threads),
diff --git a/be/src/io/fs/broker_file_system.cpp
b/be/src/io/fs/broker_file_system.cpp
index 576a7dd8959..44582a0ff8d 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() {
Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr*
writer,
const FileWriterOptions* opts) {
*writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(),
_broker_addr, _broker_prop,
- path, 0 /* offset */,
getSPtr());
+ path, 0 /* offset */,
getSPtr(), opts);
return Status::OK();
}
diff --git a/be/src/io/fs/broker_file_writer.cpp
b/be/src/io/fs/broker_file_writer.cpp
index d5b2baa7a66..75acf40084c 100644
--- a/be/src/io/fs/broker_file_writer.cpp
+++ b/be/src/io/fs/broker_file_writer.cpp
@@ -37,12 +37,15 @@ namespace io {
BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress&
broker_address,
const std::map<std::string, std::string>&
properties,
- const std::string& path, int64_t
start_offset, FileSystemSPtr fs)
+ const std::string& path, int64_t
start_offset, FileSystemSPtr fs,
+ const FileWriterOptions* opts)
: FileWriter(path, fs),
_env(env),
_address(broker_address),
_properties(properties),
- _cur_offset(start_offset) {}
+ _cur_offset(start_offset) {
+ _create_empty_file = opts ? opts->create_empty_file : true;
+}
BrokerFileWriter::~BrokerFileWriter() {
if (_opened) {
@@ -153,6 +156,14 @@ Status BrokerFileWriter::finalize() {
return Status::OK();
}
+Status BrokerFileWriter::open() {
+ if (_create_empty_file && !_opened) {
+ RETURN_IF_ERROR(_open());
+ _opened = true;
+ }
+ return Status::OK();
+}
+
Status BrokerFileWriter::_open() {
TBrokerOpenWriterRequest request;
diff --git a/be/src/io/fs/broker_file_writer.h
b/be/src/io/fs/broker_file_writer.h
index cf5b8013acb..05b62846e6c 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -42,9 +42,10 @@ class BrokerFileWriter : public FileWriter {
public:
BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties,
const std::string& path,
- int64_t start_offset, FileSystemSPtr fs);
+ int64_t start_offset, FileSystemSPtr fs, const
FileWriterOptions* opts);
virtual ~BrokerFileWriter();
+ Status open() override;
Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 58c9c9ff060..bb3235e7d27 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -35,6 +35,8 @@ struct FileWriterOptions {
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage
system
int64_t file_cache_expiration = 0; // Absolute time
+ // Whether to create empty file if no content
+ bool create_empty_file = true;
};
class FileWriter {
@@ -46,6 +48,9 @@ public:
DISALLOW_COPY_AND_ASSIGN(FileWriter);
+ // Open the file for writing.
+ virtual Status open() { return Status::OK(); }
+
// Normal close. Wait for all data to persist before returning.
virtual Status close() = 0;
@@ -74,6 +79,7 @@ protected:
std::shared_ptr<FileSystem> _fs;
bool _closed = false;
bool _opened = false;
+ bool _create_empty_file = true;
};
using FileWriterPtr = std::unique_ptr<FileWriter>;
diff --git a/be/src/io/fs/hdfs_file_system.cpp
b/be/src/io/fs/hdfs_file_system.cpp
index 8ada4b92acc..a65784226f3 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -167,8 +167,8 @@ Status HdfsFileSystem::connect_impl() {
}
Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr*
writer,
- const FileWriterOptions*) {
- *writer = std::make_unique<HdfsFileWriter>(file, getSPtr());
+ const FileWriterOptions* opts) {
+ *writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts);
return Status::OK();
}
diff --git a/be/src/io/fs/hdfs_file_writer.cpp
b/be/src/io/fs/hdfs_file_writer.cpp
index 00081db310f..40c3c59dcd7 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -34,7 +34,9 @@
namespace doris {
namespace io {
-HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) :
FileWriter(std::move(file), fs) {
+HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const
FileWriterOptions* opts)
+ : FileWriter(std::move(file), fs) {
+ _create_empty_file = opts ? opts->create_empty_file : true;
_hdfs_fs = (HdfsFileSystem*)_fs.get();
}
@@ -103,6 +105,14 @@ Status HdfsFileWriter::finalize() {
return Status::OK();
}
+Status HdfsFileWriter::open() {
+ if (_create_empty_file && !_opened) {
+ RETURN_IF_ERROR(_open());
+ _opened = true;
+ }
+ return Status::OK();
+}
+
Status HdfsFileWriter::_open() {
_path = convert_path(_path, _hdfs_fs->_fs_name);
std::string hdfs_dir = _path.parent_path().string();
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index c05f7625124..21dcaff1cd6 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -33,9 +33,10 @@ class HdfsFileSystem;
class HdfsFileWriter : public FileWriter {
public:
- HdfsFileWriter(Path file, FileSystemSPtr fs);
+ HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions*
opts);
~HdfsFileWriter();
+ Status open() override;
Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
diff --git a/be/src/io/fs/s3_file_bufferpool.cpp
b/be/src/io/fs/s3_file_bufferpool.cpp
index a4f75319ec0..4efbd439429 100644
--- a/be/src/io/fs/s3_file_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -17,50 +17,73 @@
#include "s3_file_bufferpool.h"
+#include <chrono>
+#include <memory>
+
#include "common/config.h"
+#include "common/exception.h"
#include "common/logging.h"
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "io/cache/block/block_file_cache_fwd.h"
#include "io/cache/block/block_file_segment.h"
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
#include "util/defer_op.h"
#include "util/slice.h"
+#include "vec/common/arena.h"
namespace doris {
namespace io {
bvar::Adder<uint64_t> s3_file_buffer_allocated("s3_file_buffer_allocated");
-bvar::Adder<uint64_t> s3_file_buffer_allocating("s3_file_buffer_allocating");
-/**
- * 0. check if the inner memory buffer is empty or not
- * 1. relcaim the memory buffer if it's mot empty
- */
-void FileBuffer::on_finish() {
- if (_buffer.empty()) {
- return;
+template <typename Allocator = Allocator<false>>
+struct Memory : boost::noncopyable, Allocator {
+ Memory() = default;
+ explicit Memory(size_t size) : _size(size) {
+ alloc(size);
+ s3_file_buffer_allocated << 1;
}
- S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(),
_capacity});
- _buffer.clear();
-}
+ ~Memory() {
+ dealloc();
+ s3_file_buffer_allocated << -1;
+ }
+ void alloc(size_t size) { _data =
static_cast<char*>(Allocator::alloc(size, 0)); }
+ void dealloc() {
+ if (_data == nullptr) {
+ return;
+ }
+ Allocator::free(_data, _size);
+ _data = nullptr;
+ }
+ size_t _size;
+ char* _data;
+};
-/**
- * take other buffer's memory space and refresh capacity
- */
-void FileBuffer::swap_buffer(Slice& other) {
- _buffer = other;
- _capacity = _buffer.get_size();
- other.clear();
+struct FileBuffer::PartData {
+ Memory<> _memory;
+ PartData() : _memory(config::s3_write_buffer_size) {}
+ ~PartData() = default;
+ [[nodiscard]] Slice data() const { return Slice {_memory._data,
_memory._size}; }
+ [[nodiscard]] size_t size() const { return _memory._size; }
+};
+
+Slice FileBuffer::get_slice() const {
+ return _inner_data->data();
}
-FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder,
size_t offset,
- OperationState state, bool reserve)
- : _alloc_holder(std::move(alloc_holder)),
- _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)),
+FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()>
alloc_holder,
+ size_t offset, OperationState state)
+ : _type(type),
+ _alloc_holder(std::move(alloc_holder)),
_offset(offset),
_size(0),
_state(std::move(state)),
- _capacity(_buffer.get_size()) {}
+ _inner_data(std::make_unique<FileBuffer::PartData>()),
+ _capacity(_inner_data->size()) {}
+FileBuffer::~FileBuffer() = default;
/**
* 0. check if file cache holder allocated
* 1. update the cache's type to index cache
@@ -86,136 +109,69 @@ void UploadFileBuffer::set_index_offset(size_t offset) {
* 1. write to file cache otherwise, then we'll wait for free buffer and to
rob it
*/
Status UploadFileBuffer::append_data(const Slice& data) {
- Defer defer {[&] { _size += data.get_size(); }};
- while (true) {
- // if buf is not empty, it means there is memory preserved for this buf
- if (!_buffer.empty()) {
- std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(),
data.get_size());
- break;
- }
- // if the buf has no memory reserved, then write to disk first
- if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder
!= nullptr) {
- _holder = _alloc_holder();
- bool cache_is_not_enough = false;
- for (auto& segment : _holder->file_segments) {
- DCHECK(segment->state() == FileBlock::State::SKIP_CACHE ||
- segment->state() == FileBlock::State::EMPTY);
- if (segment->state() == FileBlock::State::SKIP_CACHE)
[[unlikely]] {
- cache_is_not_enough = true;
- break;
- }
- if (_index_offset != 0) {
-
RETURN_IF_ERROR(segment->change_cache_type_self(CacheType::INDEX));
- }
- }
- // if cache_is_not_enough, cannot use it !
- _cur_file_segment = _holder->file_segments.begin();
- _append_offset = (*_cur_file_segment)->range().left;
- _holder = cache_is_not_enough ? nullptr : std::move(_holder);
- if (_holder) {
- (*_cur_file_segment)->get_or_set_downloader();
- }
- _is_cache_allocated = true;
- }
- if (_holder) [[likely]] {
- size_t data_remain_size = data.get_size();
- size_t pos = 0;
- while (data_remain_size != 0) {
- auto range = (*_cur_file_segment)->range();
- size_t segment_remain_size = range.right - _append_offset + 1;
- size_t append_size = std::min(data_remain_size,
segment_remain_size);
- Slice append_data(data.get_data() + pos, append_size);
- // When there is no available free memory buffer, the data
will be written to the cache first
- // and then uploaded to S3 when there is an available free
memory buffer.
- // However, if an error occurs during the write process to the
local cache,
- // continuing to upload the dirty data from the cache to S3
will result in erroneous data(Bad segment).
- // Considering that local disk write failures are rare, a
simple approach is chosen here,
- // which is to treat the import as a failure directly when a
local write failure occurs
- RETURN_IF_ERROR((*_cur_file_segment)->append(append_data));
- if (segment_remain_size == append_size) {
- RETURN_IF_ERROR((*_cur_file_segment)->finalize_write());
- if (++_cur_file_segment != _holder->file_segments.end()) {
- (*_cur_file_segment)->get_or_set_downloader();
- }
- }
- data_remain_size -= append_size;
- _append_offset += append_size;
- pos += append_size;
- }
- break;
- } else {
- // wait allocate buffer pool
- auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
- if (tmp.empty()) [[unlikely]] {
- return Status::InternalError("Failed to allocate S3 buffer for
{} seconds",
-
config::s3_writer_buffer_allocation_timeout);
- }
- swap_buffer(tmp);
- }
- }
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::append_data",
Status::OK());
+ std::memcpy((void*)(_inner_data->data().get_data() + _size),
data.get_data(), data.get_size());
+ _size += data.get_size();
+ _crc_value = crc32c::Extend(_crc_value, data.get_data(), data.get_size());
return Status::OK();
}
/**
- * 0. allocate one memory buffer
- * 1. read the content from the cache and then write
- * it into memory buffer
+ * 0. constrcut the stream ptr if the buffer is not empty
+ * 1. submit the on_upload() callback to executor
*/
-void UploadFileBuffer::read_from_cache() {
- auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
- if (tmp.empty()) [[unlikely]] {
- set_val(Status::InternalError("Failed to allocate S3 buffer for {}
seconds",
-
config::s3_writer_buffer_allocation_timeout));
- return;
- }
- swap_buffer(tmp);
+static Status submit_upload_buffer(std::shared_ptr<FileBuffer> buffer) {
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::submit",
Status::OK(), buffer.get());
+ return ExecEnv::GetInstance()->s3_file_upload_thread_pool()->submit_func(
+ [buf = std::move(buffer)]() { buf->execute_async(); });
+}
- DCHECK(_holder != nullptr);
- DCHECK(_capacity >= _size);
- size_t pos = 0;
- for (auto& segment : _holder->file_segments) {
- if (pos == _size) {
- break;
- }
- if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] {
- set_val(std::move(s));
- return;
- }
- size_t segment_size = segment->range().size();
- Slice s(_buffer.get_data() + pos, segment_size);
- if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] {
- set_val(std::move(st));
- return;
- }
- pos += segment_size;
+std::ostream& operator<<(std::ostream& os, const BufferType& value) {
+ switch (value) {
+ case BufferType::UPLOAD:
+ os << "upload";
+ break;
+ case BufferType::DOWNLOAD:
+ os << "download";
+ break;
+ default:
+ auto cast_value = static_cast<uint32_t>(value);
+ os << cast_value;
}
+ return os;
+}
- // the real lenght should be the buf.get_size() in this situation(consider
it's the last part,
- // size of it could be less than 5MB)
- _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(),
_size);
+Status FileBuffer::submit(std::shared_ptr<FileBuffer> buf) {
+ switch (buf->_type) {
+ case BufferType::UPLOAD:
+ return submit_upload_buffer(std::move(buf));
+ break;
+ default:
+ CHECK(false) << "should never come here, the illegal type is " <<
buf->_type;
+ };
+ return Status::InternalError("should never come here");
}
-/**
- * 0. constrcut the stream ptr if the buffer is not empty
- * 1. submit the on_upload() callback to executor
- */
-void UploadFileBuffer::submit() {
- if (!_buffer.empty()) [[likely]] {
- _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(),
_size);
+void UploadFileBuffer::on_upload() {
+ _stream_ptr =
std::make_shared<StringViewStream>(_inner_data->data().get_data(), _size);
+ if (_crc_value != crc32c::Value(_inner_data->data().get_data(), _size)) {
+ DCHECK(false);
+ set_status(Status::IOError("Buffer checksum not match"));
+ return;
}
- // If the data is written into file cache
- if (_holder && _cur_file_segment != _holder->file_segments.end()) {
- if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok())
[[unlikely]] {
- set_val(std::move(s));
- return;
- }
+ _upload_to_remote(*this);
+ if (config::enable_flush_file_cache_async) {
+ // If we call is_cancelled() after _state.set_status() then there
might one situation where
+ // s3 file writer is already destructed
+ bool cancelled = is_cancelled();
+ _state.set_status();
+ // this control flow means the buf and the stream shares one memory
+ // so we can directly use buf here
+ upload_to_local_file_cache(cancelled);
+ } else {
+ upload_to_local_file_cache(is_cancelled());
+ _state.set_status();
}
-
static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func(
- [buf = this->shared_from_this(), this]() {
- // to extend buf's lifetime
- // (void)buf;
- on_upload();
- }));
}
/**
@@ -231,6 +187,7 @@ void UploadFileBuffer::upload_to_local_file_cache(bool
is_cancelled) {
if (is_cancelled) {
return;
}
+
TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache");
// the data is already written to S3 in this situation
// so i didn't handle the file cache write error
_holder = _alloc_holder();
@@ -244,20 +201,26 @@ void UploadFileBuffer::upload_to_local_file_cache(bool
is_cancelled) {
size_t append_size = std::min(data_remain_size, segment_size);
if (segment->state() == FileBlock::State::EMPTY) {
if (_index_offset != 0 && segment->range().right >= _index_offset)
{
- // segment->change_cache_type_self(CacheType::INDEX);
+
static_cast<void>(segment->change_cache_type_self(CacheType::INDEX));
}
segment->get_or_set_downloader();
// Another thread may have started downloading due to a query
// Just skip putting to cache from UploadFileBuffer
if (segment->is_downloader()) {
- Slice s(_buffer.get_data() + pos, append_size);
- if (auto st = segment->append(s); !st.ok()) [[unlikely]] {
- LOG_WARNING("append data to cache segmetn failed due to
{}", st);
- return;
+ Slice s(_inner_data->data().get_data() + pos, append_size);
+ Status st = segment->append(s);
+
TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache_inject",
+ &st);
+ if (st.ok()) {
+ st = segment->finalize_write();
}
- if (auto st = segment->finalize_write(); !st.ok())
[[unlikely]] {
- LOG_WARNING("finalize write to cache segmetn failed due to
{}", st);
- return;
+ if (!st.ok()) {
+ {
+ [[maybe_unused]] bool ret = false;
+
TEST_SYNC_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache",
+ &ret);
+ }
+ LOG_WARNING("failed to append data to file
cache").error(st);
}
}
}
@@ -287,82 +250,17 @@ FileBufferBuilder&
FileBufferBuilder::set_allocate_file_segments_holder(
return *this;
}
-std::shared_ptr<FileBuffer> FileBufferBuilder::build() {
+Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
OperationState state(_sync_after_complete_task, _is_cancelled);
+
if (_type == BufferType::UPLOAD) {
- return std::make_shared<UploadFileBuffer>(std::move(_upload_cb),
std::move(state), _offset,
- std::move(_alloc_holder_cb),
_index_offset);
+ RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
+ std::move(_upload_cb),
std::move(state), _offset,
+ std::move(_alloc_holder_cb),
_index_offset));
+ return Status::OK();
}
// should never come here
- return nullptr;
-}
-
-void S3FileBufferPool::reclaim(Slice buf) {
- {
- std::unique_lock<std::mutex> lck {_lock};
- _free_raw_buffers.emplace_back(buf);
- // only works when not set file cache
- _cv.notify_all();
- }
- s3_file_buffer_allocated << -1;
-}
-
-void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t
s3_write_buffer_size,
- ThreadPool* thread_pool) {
- // the nums could be one configuration
- size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
- DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
- (s3_write_buffer_whole_size > s3_write_buffer_size))
- << "s3 write buffer size " << s3_write_buffer_size << " whole s3
write buffer size "
- << s3_write_buffer_whole_size;
- LOG_INFO("S3 file buffer pool with {} buffers, each with {}", buf_num,
s3_write_buffer_size);
- _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
- for (size_t i = 0; i < buf_num; i++) {
- Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
- static_cast<size_t>(s3_write_buffer_size)};
- _free_raw_buffers.emplace_back(s);
- }
- _thread_pool = thread_pool;
-}
-
-Slice S3FileBufferPool::allocate(bool reserve) {
- Slice buf;
- Defer defer {[&]() {
- if (!buf.empty()) {
- s3_file_buffer_allocated << 1;
- }
- s3_file_buffer_allocating << -1;
- }};
- s3_file_buffer_allocating << 1;
- // if need reserve or no cache then we must ensure return buf with memory
preserved
- if (reserve || !config::enable_file_cache) {
- {
- std::unique_lock<std::mutex> lck {_lock};
- _cv.wait_for(lck,
std::chrono::seconds(config::s3_writer_buffer_allocation_timeout),
- [this]() { return !_free_raw_buffers.empty(); });
- if (!_free_raw_buffers.empty()) {
- buf = _free_raw_buffers.front();
- _free_raw_buffers.pop_front();
- }
- }
- return buf;
- }
- // try to get one memory reserved buffer
- {
- std::unique_lock<std::mutex> lck {_lock};
- if (!_free_raw_buffers.empty()) {
- buf = _free_raw_buffers.front();
- _free_raw_buffers.pop_front();
- }
- }
- if (!buf.empty()) {
- return buf;
- }
- // if there is no free buffer and no need to reserve memory, we could
return one empty buffer
- buf = Slice();
- // if the buf has no memory reserved, it would try to write the data to
file cache first
- // or it would try to rob buffer from other S3FileBuffer
- return buf;
+ return Status::InternalError("unsupport buffer type {}", _type);
}
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/s3_file_bufferpool.h
b/be/src/io/fs/s3_file_bufferpool.h
index 01d31928748..c1bdf08f7ae 100644
--- a/be/src/io/fs/s3_file_bufferpool.h
+++ b/be/src/io/fs/s3_file_bufferpool.h
@@ -27,13 +27,13 @@
#include "common/status.h"
#include "io/cache/block/block_file_segment.h"
-#include "runtime/exec_env.h"
+#include "util/crc32c.h"
#include "util/slice.h"
#include "util/threadpool.h"
namespace doris {
namespace io {
-enum class BufferType { DOWNLOAD, UPLOAD };
+enum class BufferType : uint32_t { DOWNLOAD, UPLOAD };
using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
struct OperationState {
OperationState(std::function<bool(Status)> sync_after_complete_task,
@@ -45,13 +45,13 @@ struct OperationState {
*
* @param S the execution result
*/
- void set_val(Status s = Status::OK()) {
+ void set_status(Status s = Status::OK()) {
// make sure we wouldn't sync twice
if (_value_set) [[unlikely]] {
return;
}
if (nullptr != _sync_after_complete_task) {
- _fail_after_sync = _sync_after_complete_task(s);
+ _fail_after_sync = _sync_after_complete_task(std::move(s));
}
_value_set = true;
}
@@ -75,36 +75,27 @@ struct OperationState {
bool _fail_after_sync = false;
};
-struct FileBuffer : public std::enable_shared_from_this<FileBuffer> {
- FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t
offset,
- OperationState state, bool reserve = false);
- virtual ~FileBuffer() { on_finish(); }
+struct FileBuffer {
+ FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()>
alloc_holder, size_t offset,
+ OperationState state);
+ virtual ~FileBuffer();
/**
* submit the correspoding task to async executor
*/
- virtual void submit() = 0;
+ static Status submit(std::shared_ptr<FileBuffer> buf);
/**
* append data to the inner memory buffer
*
* @param S the content to be appended
*/
virtual Status append_data(const Slice& s) = 0;
- /**
- * call the reclaim callback when task is done
- */
- void on_finish();
- /**
- * swap memory buffer
- *
- * @param other which has memory buffer allocated
- */
- void swap_buffer(Slice& other);
+ virtual void execute_async() = 0;
/**
* set the val of it's operation state
*
* @param S the execution result
*/
- void set_val(Status s) { _state.set_val(s); }
+ void set_status(Status s) { _state.set_status(s); }
/**
* get the start offset of this file buffer
*
@@ -117,6 +108,8 @@ struct FileBuffer : public
std::enable_shared_from_this<FileBuffer> {
* @return the size of the buffered data
*/
size_t get_size() const { return _size; }
+ size_t get_capacaticy() const { return _capacity; }
+ Slice get_slice() const;
/**
* detect whether the execution task is done
*
@@ -124,11 +117,13 @@ struct FileBuffer : public
std::enable_shared_from_this<FileBuffer> {
*/
bool is_cancelled() const { return _state.is_cancelled(); }
+ BufferType _type;
std::function<FileBlocksHolderPtr()> _alloc_holder;
- Slice _buffer;
size_t _offset;
size_t _size;
OperationState _state;
+ struct PartData;
+ std::unique_ptr<PartData> _inner_data;
size_t _capacity;
};
@@ -136,11 +131,10 @@ struct UploadFileBuffer final : public FileBuffer {
UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb,
OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()>
alloc_holder,
size_t index_offset)
- : FileBuffer(alloc_holder, offset, state),
+ : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
_upload_to_remote(std::move(upload_cb)),
_index_offset(index_offset) {}
~UploadFileBuffer() override = default;
- void submit() override;
/**
* set the index offset
*
@@ -158,6 +152,8 @@ struct UploadFileBuffer final : public FileBuffer {
* local file cache
*/
void upload_to_local_file_cache(bool);
+
+ void execute_async() override { on_upload(); }
/**
* do the upload work
* 1. read from cache if the data is written to cache first
@@ -166,25 +162,7 @@ struct UploadFileBuffer final : public FileBuffer {
* 4. call the finish callback caller specified
* 5. reclaim self
*/
- void on_upload() {
- if (_buffer.empty()) {
- read_from_cache();
- }
- _upload_to_remote(*this);
- if (config::enable_flush_file_cache_async) {
- // If we call is_cancelled() after _state.set_val() then there
might one situation where
- // s3 file writer is already destructed
- bool cancelled = is_cancelled();
- _state.set_val();
- // this control flow means the buf and the stream shares one memory
- // so we can directly use buf here
- upload_to_local_file_cache(cancelled);
- } else {
- upload_to_local_file_cache(is_cancelled());
- _state.set_val();
- }
- on_finish();
- }
+ void on_upload();
/**
*
* @return the stream representing the inner memory buffer
@@ -207,6 +185,7 @@ private:
decltype(_holder->file_segments.begin()) _cur_file_segment;
size_t _append_offset {0};
size_t _index_offset {0};
+ uint32_t _crc_value = 0;
};
struct FileBufferBuilder {
@@ -216,7 +195,7 @@ struct FileBufferBuilder {
* build one file buffer using previously set properties
* @return the file buffer's base shared pointer
*/
- std::shared_ptr<FileBuffer> build();
+ Status build(std::shared_ptr<FileBuffer>* buf);
/**
* set the file buffer type
*
@@ -310,43 +289,5 @@ struct FileBufferBuilder {
size_t _offset;
size_t _index_offset;
};
-
-class S3FileBufferPool {
-public:
- S3FileBufferPool() = default;
- ~S3FileBufferPool() = default;
-
- // should be called one and only once
- // at startup
- void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
- doris::ThreadPool* thread_pool);
-
- /**
- *
- * @return singleton of the S3FileBufferPool
- */
- static S3FileBufferPool* GetInstance() {
- return ExecEnv::GetInstance()->get_s3_file_buffer_pool();
- }
-
- void reclaim(Slice buf);
-
- /**
- *
- * @param reserve must return buffer with memory allocated
- * @return memory buffer
- */
- Slice allocate(bool reserve = false);
-
- ThreadPool* thread_pool() { return _thread_pool; }
-
-private:
- std::mutex _lock;
- std::condition_variable _cv;
- std::unique_ptr<char[]> _whole_mem_buffer;
- std::list<Slice> _free_raw_buffers;
- // not owned
- ThreadPool* _thread_pool = nullptr;
-};
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index dd93bbebe32..dbe5ce8e70c 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -98,6 +98,8 @@ S3FileWriter::S3FileWriter(std::string key,
std::shared_ptr<S3FileSystem> fs,
_cache_key = IFileCache::hash(_path.filename().native());
_cache = FileCacheFactory::instance()->get_by_path(_cache_key);
}
+
+ _create_empty_file = opts ? opts->create_empty_file : true;
}
S3FileWriter::~S3FileWriter() {
@@ -161,7 +163,6 @@ Status S3FileWriter::_abort() {
}
// we need to reclaim the memory
if (_pending_buf) {
- _pending_buf->on_finish();
_pending_buf = nullptr;
}
LOG(INFO) << "S3FileWriter::abort, path: " << _path.native();
@@ -199,17 +200,46 @@ Status S3FileWriter::close() {
return _st;
}
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
- // it might be one file less than 5MB, we do upload here
- if (_pending_buf != nullptr) {
- if (_upload_id.empty()) {
- auto buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+
+ if (_upload_id.empty()) {
+ if (_pending_buf != nullptr) {
+ // it might be one file less than 5MB, we do upload here
+ auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) {
_put_object(b); });
+ } else if (_create_empty_file) {
+ // if there is no pending buffer, we need to create an empty file
+ auto builder = FileBufferBuilder();
+ builder.set_type(BufferType::UPLOAD)
+ .set_upload_callback([this](UploadFileBuffer& buf) {
_put_object(buf); })
+ .set_sync_after_complete_task([this](Status s) {
+ bool ret = false;
+ if (!s.ok()) [[unlikely]] {
+ VLOG_NOTICE << "failed at key: " << _key
+ << ", status: " << s.to_string();
+ std::unique_lock<std::mutex> _lck
{_completed_lock};
+ _failed = true;
+ ret = true;
+ this->_st = std::move(s);
+ }
+ // After the signal, there is a scenario where the
previous invocation of _wait_until_finish
+ // returns to the caller, and subsequently, the S3
file writer is destructed.
+ // This means that accessing _failed afterwards would
result in a heap use after free vulnerability.
+ _countdown_event.signal();
+ return ret;
+ })
+ .set_is_cancelled([this]() { return _failed.load(); });
+ RETURN_IF_ERROR(builder.build(&_pending_buf));
+ auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+ DCHECK(buf != nullptr);
}
+ }
+ if (_pending_buf != nullptr) {
_countdown_event.add_count();
- _pending_buf->submit();
+ RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
_pending_buf = nullptr;
}
+
DBUG_EXECUTE_IF("s3_file_writer::close", {
RETURN_IF_ERROR(_complete());
return Status::InternalError("failed to close s3 file writer");
@@ -273,7 +303,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
return
std::make_unique<FileBlocksHolder>(std::move(holder));
});
}
- _pending_buf = builder.build();
+ RETURN_IF_ERROR(builder.build(&_pending_buf));
}
// we need to make sure all parts except the last one to be 5MB or
more
// and shouldn't be larger than buf
@@ -296,7 +326,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
}
_cur_part_num++;
_countdown_event.add_count();
- _pending_buf->submit();
+ RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
_pending_buf = nullptr;
}
_bytes_appended += data_size_to_append;
@@ -334,16 +364,20 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
UploadFileBuffer& buf) {
"injected error",
_bucket, _path.native(), part_num, _upload_id);
LOG_WARNING(s.to_string());
- buf.set_val(s);
+ buf.set_status(std::move(s));
return;
}
});
if (!upload_part_outcome.IsSuccess()) {
- _st = s3fs_error(upload_part_outcome.GetError(),
- fmt::format("failed to upload part {}, part_num={},
upload_id={}",
- _path.native(), part_num, _upload_id));
- LOG(WARNING) << _st;
- buf.set_val(_st);
+ auto s = Status::IOError(
+ "failed to upload part (bucket={}, key={}, part_num={},
up_load_id={}): {}, "
+ "exception {}, error code {}",
+ _bucket, _path.native(), part_num, _upload_id,
+ upload_part_outcome.GetError().GetMessage(),
+ upload_part_outcome.GetError().GetExceptionName(),
+ upload_part_outcome.GetError().GetResponseCode());
+ LOG_WARNING(s.to_string());
+ buf.set_status(std::move(s));
return;
}
s3_bytes_written_total << buf.get_size();
@@ -431,12 +465,12 @@ Status S3FileWriter::finalize() {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
if (_upload_id.empty()) {
- auto buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+ auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) {
_put_object(b); });
}
_countdown_event.add_count();
- _pending_buf->submit();
+ RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
_pending_buf = nullptr;
}
_wait_until_finish("finalize");
@@ -454,7 +488,7 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
request.SetContentType("application/octet-stream");
DBUG_EXECUTE_IF("s3_file_writer::_put_object", {
_st = Status::InternalError("failed to put object");
- buf.set_val(_st);
+ buf.set_status(_st);
LOG(WARNING) << _st;
return;
});
@@ -464,7 +498,7 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
_st = s3fs_error(response.GetError(), fmt::format("failed to put
object {}, upload_id={}",
_path.native(),
_upload_id));
LOG(WARNING) << _st;
- buf.set_val(_st);
+ buf.set_status(_st);
return;
}
_bytes_written += buf.get_size();
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 991ad4a831b..278b02db4d9 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -707,7 +707,7 @@ Status
BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWrite
_context.file_cache_ttl_sec > 0 &&
_context.newest_write_timestamp > 0
? _context.newest_write_timestamp +
_context.file_cache_ttl_sec
: 0,
- };
+ .create_empty_file = false};
Status st = fs->create_file(path, &file_writer, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ",
err: " << st;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0f36278d120..1f6b1b7e8b9 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -52,7 +52,6 @@ namespace taskgroup {
class TaskGroupManager;
}
namespace io {
-class S3FileBufferPool;
class FileCacheFactory;
} // namespace io
namespace segment_v2 {
@@ -251,7 +250,6 @@ public:
TabletSchemaCache* get_tablet_schema_cache() { return
_tablet_schema_cache; }
StorageEngine* get_storage_engine() { return _storage_engine; }
- io::S3FileBufferPool* get_s3_file_buffer_pool() { return _s3_buffer_pool; }
SchemaCache* schema_cache() { return _schema_cache; }
StoragePageCache* get_storage_page_cache() { return _storage_page_cache; }
SegmentLoader* segment_loader() { return _segment_loader; }
@@ -364,7 +362,6 @@ private:
// these redundancy header could introduce potential bug, at least, more
header means slow compile.
// So we choose to use raw pointer, please remember to delete these
pointer in deconstructor.
TabletSchemaCache* _tablet_schema_cache = nullptr;
- io::S3FileBufferPool* _s3_buffer_pool = nullptr;
StorageEngine* _storage_engine = nullptr;
SchemaCache* _schema_cache = nullptr;
StoragePageCache* _storage_page_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index c7278af9be2..f5f08a71064 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -253,11 +253,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_tablet_schema_cache =
TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);
- // S3 buffer pool
- _s3_buffer_pool = new io::S3FileBufferPool();
- _s3_buffer_pool->init(config::s3_write_buffer_whole_size,
config::s3_write_buffer_size,
- this->s3_file_upload_thread_pool());
-
// Storage engine
doris::EngineOptions options;
options.store_paths = store_paths;
@@ -552,7 +547,6 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_lazy_release_obj_pool);
SAFE_SHUTDOWN(_send_report_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);
- SAFE_DELETE(_s3_buffer_pool);
_deregister_metrics();
SAFE_DELETE(_load_channel_mgr);
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index bf3d09cdda6..d3228d33680 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -141,6 +141,7 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
FileFactory::convert_storage_type(_storage_type),
_state->exec_env(),
_file_opts->broker_addresses, _file_opts->broker_properties,
file_name, 0,
_file_writer_impl));
+ RETURN_IF_ERROR(_file_writer_impl->open());
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
_vfile_writer.reset(new VCSVTransformer(_state,
_file_writer_impl.get(),
diff --git a/be/test/io/fs/s3_file_writer_test.cpp
b/be/test/io/fs/s3_file_writer_test.cpp
index b37d358f657..5ff1cc5e48a 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -30,7 +30,6 @@
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
-#include "io/fs/s3_file_bufferpool.h"
#include "io/fs/s3_file_system.h"
#include "io/io_common.h"
#include "runtime/exec_env.h"
@@ -69,10 +68,6 @@ public:
.build(&_s3_file_upload_thread_pool));
ExecEnv::GetInstance()->_s3_file_upload_thread_pool =
std::move(_s3_file_upload_thread_pool);
- ExecEnv::GetInstance()->_s3_buffer_pool = new io::S3FileBufferPool();
- io::S3FileBufferPool::GetInstance()->init(
- config::s3_write_buffer_whole_size,
config::s3_write_buffer_size,
- ExecEnv::GetInstance()->_s3_file_upload_thread_pool.get());
}
static void TearDownTestSuite() {
@@ -81,8 +76,6 @@ public:
}
ExecEnv::GetInstance()->_s3_file_upload_thread_pool->shutdown();
ExecEnv::GetInstance()->_s3_file_upload_thread_pool = nullptr;
- delete ExecEnv::GetInstance()->_s3_buffer_pool;
- ExecEnv::GetInstance()->_s3_buffer_pool = nullptr;
}
void SetUp() override {
diff --git
a/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
new file mode 100644
index 00000000000..260c177d310
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_base1 --
+
+-- !select_tvf1 --
+
+-- !select_tvf2 --
+
+-- !select_tvf3 --
+
diff --git
a/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
new file mode 100644
index 00000000000..1804fff2a11
--- /dev/null
+++
b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_outfile_empty_data", "external,hive,tvf,external_docker") {
+
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("diable Hive test.")
+ return;
+ }
+
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ // use to outfile to hdfs
+ String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ // It's okay to use random `hdfsUser`, but can not be empty.
+ def hdfsUserName = "doris"
+ def format = "csv"
+ def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
+
+ // use to outfile to s3
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ // broker
+ String broker_name = "hdfs"
+
+ def export_table_name = "outfile_empty_data_test"
+
+ def create_table = {table_name, column_define ->
+ sql """ DROP TABLE IF EXISTS ${table_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ ${column_define}
+ )
+ DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+ """
+ }
+
+ def outfile_to_HDFS_directly = {
+ // select ... into outfile ...
+ def uuid = UUID.randomUUID().toString()
+
+ hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
+ uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"
+
+ def res = sql """
+ SELECT * FROM ${export_table_name} t ORDER BY user_id
+ INTO OUTFILE "${uri}"
+ FORMAT AS ${format}
+ PROPERTIES (
+ "fs.defaultFS"="${defaultFS}",
+ "hadoop.username" = "${hdfsUserName}"
+ );
+ """
+ logger.info("outfile to hdfs direct success path: " + res[0][3]);
+ return res[0][3]
+ }
+
+ def outfile_to_HDFS_with_broker = {
+ // select ... into outfile ...
+ def uuid = UUID.randomUUID().toString()
+
+ hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
+ uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"
+
+ def res = sql """
+ SELECT * FROM ${export_table_name} t ORDER BY user_id
+ INTO OUTFILE "${uri}"
+ FORMAT AS ${format}
+ PROPERTIES (
+ "broker.fs.defaultFS"="${defaultFS}",
+ "broker.name"="hdfs",
+ "broker.username" = "${hdfsUserName}"
+ );
+ """
+ logger.info("outfile to hdfs with broker success path: " + res[0][3]);
+ return res[0][3]
+ }
+
+ def outfile_to_S3_directly = {
+ // select ... into outfile ...
+ s3_outfile_path = "${bucket}/outfile/csv/test-outfile-empty/"
+ uri = "s3://${s3_outfile_path}/exp_"
+
+ def res = sql """
+ SELECT * FROM ${export_table_name} t ORDER BY user_id
+ INTO OUTFILE "${uri}"
+ FORMAT AS csv
+ PROPERTIES (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}"
+ );
+ """
+ logger.info("outfile to s3 success path: " + res[0][3]);
+ return res[0][3]
+ }
+
+ try {
+ def doris_column_define = """
+ `user_id` INT NOT NULL COMMENT "用户id",
+ `name` STRING NULL,
+ `age` INT NULL"""
+ // create table
+ create_table(export_table_name, doris_column_define);
+ // test outfile empty data to hdfs directly
+ def outfile_to_hdfs_directly_url = outfile_to_HDFS_directly()
+ // test outfile empty data to hdfs with broker
+ def outfile_to_hdfs_with_broker_url= outfile_to_HDFS_with_broker()
+ // test outfile empty data to s3 directly
+ def outfile_to_s3_directly_url = outfile_to_S3_directly()
+ qt_select_base1 """ SELECT * FROM ${export_table_name} ORDER BY
user_id; """
+
+ qt_select_tvf1 """ select * from HDFS(
+ "uri" = "${outfile_to_hdfs_directly_url}0.csv",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}");
+ """
+
+ qt_select_tvf2 """ select * from HDFS(
+ "uri" = "${outfile_to_hdfs_with_broker_url}0.csv",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}");
+ """
+
+ qt_select_tvf3 """ SELECT * FROM S3 (
+ "uri" =
"http://${s3_endpoint}${outfile_to_s3_directly_url.substring(4,
outfile_to_s3_directly_url.length())}0.csv",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format}",
+ "region" = "${region}",
+ "use_path_style" = "true"
+ );
+ """
+
+ } finally {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]