This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3cd0bdf3ba8 [feature](IO) Replace file writer's finalize function with
async close (#34679)
3cd0bdf3ba8 is described below
commit 3cd0bdf3ba8c002ae092b3d87bd766cb18e85d21
Author: AlexYue <[email protected]>
AuthorDate: Mon May 13 16:44:58 2024 +0800
[feature](IO) Replace file writer's finalize function with async close
(#34679)
---
be/src/common/config.cpp | 5 +
be/src/common/config.h | 5 +
be/src/io/fs/broker_file_writer.cpp | 30 ++++--
be/src/io/fs/broker_file_writer.h | 8 +-
be/src/io/fs/file_writer.h | 20 ++--
be/src/io/fs/hdfs_file_writer.cpp | 81 +++++++++--------
be/src/io/fs/hdfs_file_writer.h | 14 ++-
be/src/io/fs/local_file_writer.cpp | 59 ++++++++----
be/src/io/fs/local_file_writer.h | 10 +-
be/src/io/fs/s3_file_writer.cpp | 101 ++++++++-------------
be/src/io/fs/s3_file_writer.h | 12 ++-
be/src/io/fs/stream_sink_file_writer.cpp | 29 ++++--
be/src/io/fs/stream_sink_file_writer.h | 11 +--
be/src/olap/rowset/beta_rowset_writer.cpp | 4 +-
.../segment_v2/inverted_index_fs_directory.cpp | 10 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 2 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 2 +-
be/src/olap/storage_engine.h | 1 +
be/src/pipeline/pipeline_tracing.cpp | 2 -
be/src/runtime/exec_env.h | 2 +
be/src/runtime/exec_env_init.cpp | 31 +++++++
be/src/runtime/load_stream_writer.cpp | 4 +-
be/test/io/cache/block_file_cache_test.cpp | 1 -
be/test/io/fs/hdfs_file_system_test.cpp | 3 -
be/test/io/fs/local_file_system_test.cpp | 6 --
be/test/io/fs/s3_file_writer_test.cpp | 8 --
be/test/io/fs/stream_sink_file_writer_test.cpp | 2 +-
be/test/olap/tablet_cooldown_test.cpp | 6 +-
...index_compound_directory_fault_injection.groovy | 7 --
29 files changed, 268 insertions(+), 208 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8055e3fe456..4ec90cfa886 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1263,6 +1263,11 @@ DEFINE_mInt64(hdfs_jni_write_sleep_milliseconds, "300");
// The max retry times when hdfs write failed
DEFINE_mInt64(hdfs_jni_write_max_retry_time, "3");
+// The min thread num for NonBlockCloseThreadPool
+DEFINE_Int64(min_nonblock_close_thread_num, "12");
+// The max thread num for NonBlockCloseThreadPool
+DEFINE_Int64(max_nonblock_close_thread_num, "64");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 46dcdb8658b..7ff833513eb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1337,6 +1337,11 @@ DECLARE_mInt64(hdfs_jni_write_sleep_milliseconds);
// The max retry times when hdfs write failed
DECLARE_mInt64(hdfs_jni_write_max_retry_time);
+// The min thread num for NonBlockCloseThreadPool
+DECLARE_Int64(min_nonblock_close_thread_num);
+// The max thread num for NonBlockCloseThreadPool
+DECLARE_Int64(max_nonblock_close_thread_num);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/io/fs/broker_file_writer.cpp
b/be/src/io/fs/broker_file_writer.cpp
index f162b02b19e..2e5af74090a 100644
--- a/be/src/io/fs/broker_file_writer.cpp
+++ b/be/src/io/fs/broker_file_writer.cpp
@@ -28,6 +28,7 @@
#include "common/config.h"
#include "common/logging.h"
+#include "io/fs/file_writer.h"
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
@@ -60,12 +61,29 @@ inline const std::string& client_id(ExecEnv* env, const
TNetworkAddress& addr) {
}
#endif
-Status BrokerFileWriter::close() {
- if (_closed) {
+Status BrokerFileWriter::close(bool non_block) {
+ if (_close_state == FileWriterState::CLOSED) {
+ return Status::InternalError("BrokerFileWriter already closed, file
path {}",
+ _path.native());
+ }
+ if (_close_state == FileWriterState::ASYNC_CLOSING) {
+ if (non_block) {
+ return Status::InternalError("Don't submit async close multi
times");
+ }
+ // Actucally the first time call to close(true) would return the value
of _finalize, if it returned one
+ // error status then the code would never call the second close(true)
+ _close_state = FileWriterState::CLOSED;
return Status::OK();
}
- _closed = true;
+ if (non_block) {
+ _close_state = FileWriterState::ASYNC_CLOSING;
+ } else {
+ _close_state = FileWriterState::CLOSED;
+ }
+ return _close_impl();
+}
+Status BrokerFileWriter::_close_impl() {
TBrokerCloseWriterRequest request;
request.__set_version(TBrokerVersion::VERSION_ONE);
request.__set_fd(_fd);
@@ -117,7 +135,7 @@ Status BrokerFileWriter::close() {
}
Status BrokerFileWriter::appendv(const Slice* data, size_t data_cnt) {
- if (_closed) [[unlikely]] {
+ if (_close_state != FileWriterState::OPEN) [[unlikely]] {
return Status::InternalError("append to closed file: {}",
_path.native());
}
@@ -135,10 +153,6 @@ Status BrokerFileWriter::appendv(const Slice* data, size_t
data_cnt) {
return Status::OK();
}
-Status BrokerFileWriter::finalize() {
- return Status::OK();
-}
-
Result<FileWriterPtr> BrokerFileWriter::create(ExecEnv* env, const
TNetworkAddress& broker_address,
const std::map<std::string,
std::string>& properties,
Path path) {
diff --git a/be/src/io/fs/broker_file_writer.h
b/be/src/io/fs/broker_file_writer.h
index d6fce52a05c..ad065a47327 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -43,17 +43,17 @@ public:
BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, Path
path, TBrokerFD fd);
~BrokerFileWriter() override;
+ Status close(bool non_block = false) override;
- Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
- Status finalize() override;
const Path& path() const override { return _path; }
size_t bytes_appended() const override { return _cur_offset; }
- bool closed() const override { return _closed; }
+ FileWriterState closed() const override { return _close_state; }
FileCacheAllocatorBuilder* cache_builder() const override { return
nullptr; }
private:
Status _write(const uint8_t* buf, size_t buf_len, size_t* written_bytes);
+ Status _close_impl();
private:
ExecEnv* _env = nullptr;
@@ -61,7 +61,7 @@ private:
Path _path;
size_t _cur_offset = 0;
TBrokerFD _fd;
- bool _closed = false;
+ FileWriterState _close_state {FileWriterState::OPEN};
};
} // end namespace io
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index ad13f7f0095..56b7f731c51 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -17,6 +17,7 @@
#pragma once
+#include <future>
#include <memory>
#include "common/status.h"
@@ -46,6 +47,17 @@ struct FileWriterOptions {
uint64_t file_cache_expiration = 0; // Absolute time
};
+struct AsyncCloseStatusPack {
+ std::promise<Status> promise;
+ std::future<Status> future;
+};
+
+enum class FileWriterState : uint8_t {
+ OPEN = 0,
+ ASYNC_CLOSING,
+ CLOSED,
+};
+
class FileWriter {
public:
FileWriter() = default;
@@ -56,21 +68,17 @@ public:
// Normal close. Wait for all data to persist before returning.
// If there is no data appended, an empty file will be persisted.
- virtual Status close() = 0;
+ virtual Status close(bool non_block = false) = 0;
Status append(const Slice& data) { return appendv(&data, 1); }
virtual Status appendv(const Slice* data, size_t data_cnt) = 0;
- // Call this method when there is no more data to write.
- // FIXME(cyx): Does not seem to be an appropriate interface for file
system?
- virtual Status finalize() = 0;
-
virtual const Path& path() const = 0;
virtual size_t bytes_appended() const = 0;
- virtual bool closed() const = 0;
+ virtual FileWriterState closed() const = 0;
virtual FileCacheAllocatorBuilder* cache_builder() const = 0;
};
diff --git a/be/src/io/fs/hdfs_file_writer.cpp
b/be/src/io/fs/hdfs_file_writer.cpp
index a37bd323984..7a499239bd0 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -36,8 +36,10 @@
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/err_utils.h"
+#include "io/fs/file_writer.h"
#include "io/fs/hdfs_file_system.h"
#include "io/hdfs_util.h"
+#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/bvar_helper.h"
#include "util/jni-util.h"
@@ -142,6 +144,11 @@ HdfsFileWriter::HdfsFileWriter(Path path,
std::shared_ptr<HdfsHandler> handler,
}
HdfsFileWriter::~HdfsFileWriter() {
+ if (_async_close_pack != nullptr) {
+ // For thread safety
+ std::ignore = _async_close_pack->promise.get_future();
+ _async_close_pack = nullptr;
+ }
if (_hdfs_file) {
SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
@@ -195,13 +202,41 @@ Status HdfsFileWriter::_acquire_jni_memory(size_t size) {
#endif
}
-Status HdfsFileWriter::close() {
- if (_closed) {
- return Status::OK();
+Status HdfsFileWriter::close(bool non_block) {
+ if (closed() == FileWriterState::CLOSED) {
+ return Status::InternalError("HdfsFileWriter already closed, file path
{}, fs name {}",
+ _path.native(), _fs_name);
+ }
+ if (closed() == FileWriterState::ASYNC_CLOSING) {
+ if (non_block) {
+ return Status::InternalError("Don't submit async close multi
times");
+ }
+ CHECK(_async_close_pack != nullptr);
+ _st = _async_close_pack->future.get();
+ _async_close_pack = nullptr;
+ // We should wait for all the pre async task to be finished
+ _close_state = FileWriterState::CLOSED;
+ // The next time we call close() with no matter non_block true or
false, it would always return the
+ // '_st' value because this writer is already closed.
+ return _st;
+ }
+ if (non_block) {
+ _close_state = FileWriterState::ASYNC_CLOSING;
+ _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
+ _async_close_pack->future = _async_close_pack->promise.get_future();
+ return
ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func(
+ [&]() { _async_close_pack->promise.set_value(_close_impl());
});
}
- _closed = true;
+ _st = _close_impl();
+ _close_state = FileWriterState::CLOSED;
+ return _st;
+}
+
+Status HdfsFileWriter::_close_impl() {
if (_batch_buffer.size() != 0) {
- RETURN_IF_ERROR(_flush_buffer());
+ if (_st = _flush_buffer(); !_st.ok()) {
+ return _st;
+ }
}
int ret;
if (_sync_file_data) {
@@ -220,9 +255,10 @@ Status HdfsFileWriter::close() {
Status::InternalError("failed
to sync hdfs file"));
if (ret != 0) {
- return Status::InternalError(
+ _st = Status::InternalError(
"failed to sync hdfs file. fs_name={} path={} : {},
file_size={}", _fs_name,
_path.native(), hdfs_error(), bytes_appended());
+ return _st;
}
}
@@ -238,10 +274,11 @@ Status HdfsFileWriter::close() {
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile",
Status::InternalError("failed to
close hdfs file"));
if (ret != 0) {
- return Status::InternalError(
+ _st = Status::InternalError(
"Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err:
{}, file_size={}",
BackendOptions::get_localhost(), _fs_name, _path.native(),
hdfs_error(),
bytes_appended());
+ return _st;
}
hdfs_file_created_total << 1;
return Status::OK();
@@ -368,7 +405,7 @@ Status HdfsFileWriter::_append(std::string_view content) {
}
Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {
- if (_closed) [[unlikely]] {
+ if (_close_state != FileWriterState::OPEN) [[unlikely]] {
return Status::InternalError("append to closed file: {}",
_path.native());
}
@@ -378,34 +415,6 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t
data_cnt) {
return Status::OK();
}
-// Call this method when there is no more data to write.
-Status HdfsFileWriter::finalize() {
- if (_closed) [[unlikely]] {
- return Status::InternalError("finalize closed file: {}, file_size={}",
_path.native(),
- bytes_appended());
- }
- if (_batch_buffer.size() != 0) {
- RETURN_IF_ERROR(_flush_buffer());
- }
-
- // Flush buffered data to HDFS without waiting for HDFS response
- int ret;
- {
- SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
- ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsFlush(_hdfs_handler->hdfs_fs,
_hdfs_file),
-
"HdfsFileWriter::finalize::hdfsFlush");
- }
- TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsFlush",
- Status::InternalError("failed to
flush hdfs file"));
- if (ret != 0) {
- return Status::InternalError(
- "failed to flush hdfs file. fs_name={} path={} : {},
file_size={}", _fs_name,
- _path.native(), hdfs_error(), bytes_appended());
- }
-
- return Status::OK();
-}
-
Result<FileWriterPtr> HdfsFileWriter::create(Path full_path,
std::shared_ptr<HdfsHandler> handler,
const std::string& fs_name,
const FileWriterOptions* opts) {
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 4ed30c5a856..c5393042185 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -29,6 +29,7 @@ namespace io {
class HdfsHandler;
class BlockFileCache;
struct FileCacheAllocatorBuilder;
+struct AsyncCloseStatusPack;
class HdfsFileWriter final : public FileWriter {
public:
@@ -44,18 +45,19 @@ public:
std::string fs_name, const FileWriterOptions* opts =
nullptr);
~HdfsFileWriter() override;
- Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
- Status finalize() override;
const Path& path() const override { return _path; }
size_t bytes_appended() const override { return _bytes_appended; }
- bool closed() const override { return _closed; }
+ FileWriterState closed() const override { return _close_state; }
+
+ Status close(bool non_block = false) override;
FileCacheAllocatorBuilder* cache_builder() const override {
return _cache_builder == nullptr ? nullptr : _cache_builder.get();
}
private:
+ Status _close_impl();
// Flush buffered data into HDFS client and write local file cache if
enabled
// **Notice**: this would clear the underlying buffer
Status _flush_buffer();
@@ -70,7 +72,6 @@ private:
hdfsFile _hdfs_file = nullptr;
std::string _fs_name;
size_t _bytes_appended = 0;
- bool _closed = false;
bool _sync_file_data;
std::unique_ptr<FileCacheAllocatorBuilder>
_cache_builder; // nullptr if disable write file cache
@@ -90,6 +91,11 @@ private:
};
BatchBuffer _batch_buffer;
size_t _approximate_jni_buffer_size = 0;
+ std::unique_ptr<AsyncCloseStatusPack> _async_close_pack;
+ // We should make sure that close_impl's return value is consistent
+ // So we need add one field to restore the value first time return by
calling close_impl
+ Status _st;
+ FileWriterState _close_state {FileWriterState::OPEN};
};
} // namespace io
diff --git a/be/src/io/fs/local_file_writer.cpp
b/be/src/io/fs/local_file_writer.cpp
index 2e4a970d6c2..4a5da18e5de 100644
--- a/be/src/io/fs/local_file_writer.cpp
+++ b/be/src/io/fs/local_file_writer.cpp
@@ -36,6 +36,7 @@
#include "common/sync_point.h"
#include "gutil/macros.h"
#include "io/fs/err_utils.h"
+#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "olap/data_dir.h"
@@ -78,7 +79,7 @@ size_t LocalFileWriter::bytes_appended() const {
}
LocalFileWriter::~LocalFileWriter() {
- if (!_closed) {
+ if (_close_state == FileWriterState::OPEN) {
_abort();
}
DorisMetrics::instance()->local_file_open_writing->increment(-1);
@@ -86,7 +87,25 @@ LocalFileWriter::~LocalFileWriter() {
DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended);
}
-Status LocalFileWriter::close() {
+Status LocalFileWriter::close(bool non_block) {
+ if (_close_state == FileWriterState::CLOSED) {
+ return Status::InternalError("LocalFileWriter already closed, file
path {}",
+ _path.native());
+ }
+ if (_close_state == FileWriterState::ASYNC_CLOSING) {
+ if (non_block) {
+ return Status::InternalError("Don't submit async close multi
times");
+ }
+ // Actucally the first time call to close(true) would return the value
of _finalize, if it returned one
+ // error status then the code would never call the second close(true)
+ _close_state = FileWriterState::CLOSED;
+ return Status::OK();
+ }
+ if (non_block) {
+ _close_state = FileWriterState::ASYNC_CLOSING;
+ } else {
+ _close_state = FileWriterState::CLOSED;
+ }
return _close(_sync_data);
}
@@ -104,7 +123,7 @@ void LocalFileWriter::_abort() {
Status LocalFileWriter::appendv(const Slice* data, size_t data_cnt) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::appendv",
Status::IOError("inject io error"));
- if (_closed) [[unlikely]] {
+ if (_close_state != FileWriterState::OPEN) [[unlikely]] {
return Status::InternalError("append to closed file: {}",
_path.native());
}
_dirty = true;
@@ -159,10 +178,11 @@ Status LocalFileWriter::appendv(const Slice* data, size_t
data_cnt) {
return Status::OK();
}
-Status LocalFileWriter::finalize() {
+// TODO(ByteYue): Refactor this function as FileWriter::flush()
+Status LocalFileWriter::_finalize() {
TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::finalize",
Status::IOError("inject io error"));
- if (_closed) [[unlikely]] {
+ if (_close_state == FileWriterState::OPEN) [[unlikely]] {
return Status::InternalError("finalize closed file: {}",
_path.native());
}
@@ -178,39 +198,42 @@ Status LocalFileWriter::finalize() {
}
Status LocalFileWriter::_close(bool sync) {
- if (_closed) {
- return Status::OK();
- }
+ auto fd_reclaim_func = [&](Status st) {
+ if (_fd > 9 && 0 != ::close(_fd)) {
+ return localfs_error(errno, fmt::format("failed to {}, along with
failed to close {}",
+ st, _path.native()));
+ }
+ _fd = -1;
+ return st;
+ };
if (sync) {
if (_dirty) {
#ifdef __APPLE__
if (fcntl(_fd, F_FULLFSYNC) < 0) [[unlikely]] {
- return localfs_error(errno, fmt::format("failed to sync {}",
_path.native()));
+ return fd_reclaim_func(
+ localfs_error(errno, fmt::format("failed to sync {}",
_path.native())));
}
#else
if (0 != ::fdatasync(_fd)) [[unlikely]] {
- return localfs_error(errno, fmt::format("failed to sync {}",
_path.native()));
+ return fd_reclaim_func(
+ localfs_error(errno, fmt::format("failed to sync {}",
_path.native())));
}
#endif
_dirty = false;
}
- RETURN_IF_ERROR(sync_dir(_path.parent_path()));
+ RETURN_IF_ERROR(fd_reclaim_func(sync_dir(_path.parent_path())));
}
- if (0 != ::close(_fd)) {
- return localfs_error(errno, fmt::format("failed to close {}",
_path.native()));
- }
- _closed = true;
-
DBUG_EXECUTE_IF("LocalFileWriter.close.failed", {
// spare '.testfile' to make bad disk checker happy
if (_path.filename().compare(kTestFilePath)) {
- return Status::IOError("cannot close {}: {}", _path.native(),
std::strerror(errno));
+ return fd_reclaim_func(
+ Status::IOError("cannot close {}: {}", _path.native(),
std::strerror(errno)));
}
});
TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::close",
Status::IOError("inject io error"));
- return Status::OK();
+ return fd_reclaim_func(Status::OK());
}
} // namespace doris::io
diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h
index 81ebb0ebd1f..d80065825d2 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/io/fs/local_file_writer.h
@@ -31,26 +31,26 @@ public:
LocalFileWriter(Path path, int fd, bool sync_data = true);
~LocalFileWriter() override;
- Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
- Status finalize() override;
const Path& path() const override { return _path; }
size_t bytes_appended() const override;
- bool closed() const override { return _closed; }
+ FileWriterState closed() const override { return _close_state; }
FileCacheAllocatorBuilder* cache_builder() const override { return
nullptr; }
+ Status close(bool non_block = false) override;
+
private:
+ Status _finalize();
void _abort();
Status _close(bool sync);
-private:
Path _path;
int _fd; // owned
bool _dirty = false;
- bool _closed = false;
const bool _sync_data = true;
size_t _bytes_appended = 0;
+ FileWriterState _close_state {FileWriterState::OPEN};
};
} // namespace doris::io
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 952377314a1..7c3af1fa6fb 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -41,6 +41,7 @@
#include <glog/logging.h>
#include <sstream>
+#include <tuple>
#include <utility>
#include "common/config.h"
@@ -54,6 +55,7 @@
#include "io/fs/path.h"
#include "io/fs/s3_file_bufferpool.h"
#include "io/fs/s3_file_system.h"
+#include "runtime/exec_env.h"
#include "util/bvar_helper.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
@@ -65,7 +67,6 @@ namespace Aws::S3::Model {
class DeleteObjectRequest;
} // namespace Aws::S3::Model
-using Aws::S3::Model::AbortMultipartUploadRequest;
using Aws::S3::Model::CompletedPart;
using Aws::S3::Model::CompletedMultipartUpload;
using Aws::S3::Model::CompleteMultipartUploadRequest;
@@ -102,11 +103,13 @@
S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin
}
S3FileWriter::~S3FileWriter() {
- if (!closed() || _failed) {
- // if we don't abort multi part upload, the uploaded part in object
- // store will not automatically reclaim itself, it would cost more
money
- static_cast<void>(_abort());
- } else {
+ if (_async_close_pack != nullptr) {
+ // For thread safety
+ std::ignore = _async_close_pack->promise.get_future();
+ _async_close_pack = nullptr;
+ }
+ // We won't do S3 abort operation in BE, we let s3 service do it own.
+ if (closed() == FileWriterState::OPEN && !_failed) {
s3_bytes_written_total << _bytes_appended;
}
s3_file_being_written << -1;
@@ -153,48 +156,39 @@ void S3FileWriter::_wait_until_finish(std::string_view
task_name) {
}
}
-Status S3FileWriter::_abort() {
- // make all pending work early quits
- _failed = true;
-
- // we need to reclaim the memory
- if (_pending_buf) {
- _pending_buf = nullptr;
+Status S3FileWriter::close(bool non_block) {
+ if (closed() == FileWriterState::CLOSED) {
+ return Status::InternalError("S3FileWriter already closed, file path
{}, file key {}",
+ _path.native(), _key);
}
- LOG(INFO) << "S3FileWriter::abort, path: " << _path.native();
- // upload id is empty means there was no create multi upload
- if (_upload_id.empty()) {
- return Status::OK();
+ if (closed() == FileWriterState::ASYNC_CLOSING) {
+ if (non_block) {
+ return Status::InternalError("Don't submit async close multi
times");
+ }
+ CHECK(_async_close_pack != nullptr);
+ _st = _async_close_pack->future.get();
+ _async_close_pack = nullptr;
+ // We should wait for all the pre async task to be finished
+ _close_state = FileWriterState::CLOSED;
+ // The next time we call close() with no matter non_block true or
false, it would always return the
+ // '_st' value because this writer is already closed.
+ return _st;
}
- _wait_until_finish("Abort");
- AbortMultipartUploadRequest request;
- request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
- SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
- auto outcome = _client->AbortMultipartUpload(request);
- if (outcome.IsSuccess() ||
- outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD
||
- outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
- LOG(INFO) << "Abort multipart upload successfully"
- << "bucket=" << _bucket << ", key=" << _path.native()
- << ", upload_id=" << _upload_id << ", whole parts=" <<
_dump_completed_part();
- return Status::OK();
+ if (non_block) {
+ _close_state = FileWriterState::ASYNC_CLOSING;
+ _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
+ _async_close_pack->future = _async_close_pack->promise.get_future();
+ return
ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func(
+ [&]() { _async_close_pack->promise.set_value(_close_impl());
});
}
- return s3fs_error(
- outcome.GetError(),
- fmt::format("failed to abort multipart upload {} upload_id={},
whole parts={}",
- _path.native(), _upload_id, _dump_completed_part()));
+ _st = _close_impl();
+ _close_state = FileWriterState::CLOSED;
+ return _st;
}
-Status S3FileWriter::close() {
- if (closed()) {
- _wait_until_finish("close");
- return _st;
- }
-
+Status S3FileWriter::_close_impl() {
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
- Defer defer {[this] { _closed = true; }};
-
if (_upload_id.empty() && _pending_buf) {
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
@@ -248,7 +242,7 @@ Status S3FileWriter::close() {
}
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
- if (closed()) [[unlikely]] {
+ if (closed() != FileWriterState::OPEN) [[unlikely]] {
return Status::InternalError("append to closed file: {}",
_path.native());
}
@@ -457,27 +451,6 @@ Status S3FileWriter::_complete() {
return Status::OK();
}
-Status S3FileWriter::finalize() {
- if (closed()) [[unlikely]] {
- return Status::InternalError("finalize closed file: {}",
_path.native());
- }
-
- DBUG_EXECUTE_IF("s3_file_writer::finalize",
- { return Status::IOError("failed to finalize due to
injected error"); });
- // submit pending buf if it's not nullptr
- // it's the last buf, we can submit it right now
- if (_pending_buf != nullptr) {
- if (_upload_id.empty()) {
- RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
- }
- _countdown_event.add_count();
- RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
- _pending_buf = nullptr;
- }
- _wait_until_finish("finalize");
- return _st;
-}
-
Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
@@ -497,7 +470,7 @@ Status
S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
}
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
- DCHECK(!closed());
+ DCHECK(closed() != FileWriterState::CLOSED) << fmt::format("state is {}",
closed());
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(_bucket).WithKey(_key);
Aws::Utils::ByteBuffer
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream()));
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 9f72b02da60..b7d39a48245 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -42,6 +42,7 @@ struct S3Conf;
namespace io {
struct S3FileBuffer;
class S3FileSystem;
+struct AsyncCloseStatusPack;
class S3FileWriter final : public FileWriter {
public:
@@ -49,14 +50,11 @@ public:
const FileWriterOptions* opts);
~S3FileWriter() override;
- Status close() override;
-
Status appendv(const Slice* data, size_t data_cnt) override;
- Status finalize() override;
const Path& path() const override { return _path; }
size_t bytes_appended() const override { return _bytes_appended; }
- bool closed() const override { return _closed; }
+ FileWriterState closed() const override { return _close_state; }
FileCacheAllocatorBuilder* cache_builder() const override {
return _cache_builder == nullptr ? nullptr : _cache_builder.get();
@@ -70,7 +68,10 @@ public:
const std::string& bucket() const { return _bucket; }
const std::string& upload_id() const { return _upload_id; }
+ Status close(bool non_block = false) override;
+
private:
+ Status _close_impl();
Status _abort();
[[nodiscard]] std::string _dump_completed_part() const;
void _wait_until_finish(std::string_view task_name);
@@ -97,7 +98,6 @@ private:
std::atomic_bool _failed = false;
- bool _closed = false;
Status _st;
size_t _bytes_appended = 0;
@@ -113,6 +113,8 @@ private:
// Because hive committers have best-effort semantics,
// this shortens the inconsistent time window.
bool _used_by_s3_committer;
+ std::unique_ptr<AsyncCloseStatusPack> _async_close_pack;
+ FileWriterState _close_state {FileWriterState::OPEN};
};
} // namespace io
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index faac2bf55d9..4555a9c8c4b 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -111,7 +111,29 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
return Status::OK();
}
-Status StreamSinkFileWriter::finalize() {
+Status StreamSinkFileWriter::close(bool non_block) {
+ if (_close_state == FileWriterState::CLOSED) {
+ return Status::InternalError("StreamSinkFileWriter already closed,
load id {}",
+ print_id(_load_id));
+ }
+ if (_close_state == FileWriterState::ASYNC_CLOSING) {
+ if (non_block) {
+ return Status::InternalError("Don't submit async close multi
times");
+ }
+ // Actucally the first time call to close(true) would return the value
of _finalize, if it returned one
+ // error status then the code would never call the second close(true)
+ _close_state = FileWriterState::CLOSED;
+ return Status::OK();
+ }
+ if (non_block) {
+ _close_state = FileWriterState::ASYNC_CLOSING;
+ } else {
+ _close_state = FileWriterState::CLOSED;
+ }
+ return _finalize();
+}
+
+Status StreamSinkFileWriter::_finalize() {
VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ",
index_id: " << _index_id
<< ", tablet_id: " << _tablet_id << ", segment_id: " <<
_segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
@@ -144,9 +166,4 @@ Status StreamSinkFileWriter::finalize() {
return Status::OK();
}
-Status StreamSinkFileWriter::close() {
- _closed = true;
- return Status::OK();
-}
-
} // namespace doris::io
diff --git a/be/src/io/fs/stream_sink_file_writer.h
b/be/src/io/fs/stream_sink_file_writer.h
index 4a0eb955c26..9769f96331b 100644
--- a/be/src/io/fs/stream_sink_file_writer.h
+++ b/be/src/io/fs/stream_sink_file_writer.h
@@ -44,13 +44,9 @@ public:
Status appendv(const Slice* data, size_t data_cnt) override;
- Status finalize() override;
-
- Status close() override;
-
size_t bytes_appended() const override { return _bytes_appended; }
- bool closed() const override { return _closed; }
+ FileWriterState closed() const override { return _close_state; }
// FIXME(plat1ko): Maybe it's an inappropriate abstraction?
const Path& path() const override {
@@ -60,7 +56,10 @@ public:
FileCacheAllocatorBuilder* cache_builder() const override { return
nullptr; }
+ Status close(bool non_block = false) override;
+
private:
+ Status _finalize();
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
PUniqueId _load_id;
@@ -68,8 +67,8 @@ private:
int64_t _index_id;
int64_t _tablet_id;
int32_t _segment_id;
- bool _closed = false;
size_t _bytes_appended = 0;
+ FileWriterState _close_state {FileWriterState::OPEN};
};
} // namespace io
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 8085fc70ebc..ef3539fe0c5 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -130,7 +130,9 @@ Status SegmentFileCollection::close() {
}
for (auto&& [_, writer] : _file_writers) {
- RETURN_IF_ERROR(writer->close());
+ if (writer->closed() != io::FileWriterState::CLOSED) {
+ RETURN_IF_ERROR(writer->close());
+ }
}
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index c4936e9c453..1c6e75aa97e 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -317,15 +317,7 @@ void DorisFSDirectory::FSIndexOutput::close() {
_CLTHROWA(err.number(), err.what());
}
if (_writer) {
- Status ret = _writer->finalize();
-
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error",
- { ret = Status::Error<INTERNAL_ERROR>("writer finalize
status error"); })
- if (!ret.ok()) {
- LOG(WARNING) << "FSIndexOutput close, file writer finalize error:
" << ret.to_string();
- _writer.reset(nullptr);
- _CLTHROWA(CL_ERR_IO, ret.to_string().c_str());
- }
- ret = _writer->close();
+ auto ret = _writer->close();
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
{ ret = Status::Error<INTERNAL_ERROR>("writer close
status error"); })
if (!ret.ok()) {
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 10ce2137b9c..939c504580f 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -1076,7 +1076,7 @@ Status SegmentWriter::finalize_columns_index(uint64_t*
index_size) {
Status SegmentWriter::finalize_footer(uint64_t* segment_file_size) {
RETURN_IF_ERROR(_write_footer());
// finish
- RETURN_IF_ERROR(_file_writer->finalize());
+ RETURN_IF_ERROR(_file_writer->close(true));
*segment_file_size = _file_writer->bytes_appended();
if (*segment_file_size == 0) {
return Status::Corruption("Bad segment, file size = 0");
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index b0b24a79c0a..7466f7861c5 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -938,7 +938,7 @@ Status
VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) {
Status VerticalSegmentWriter::finalize_footer(uint64_t* segment_file_size) {
RETURN_IF_ERROR(_write_footer());
// finish
- RETURN_IF_ERROR(_file_writer->finalize());
+ RETURN_IF_ERROR(_file_writer->close(true));
*segment_file_size = _file_writer->bytes_appended();
if (*segment_file_size == 0) {
return Status::Corruption("Bad segment, file size = 0");
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 860886bc9b7..2907c4b7a9b 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -434,6 +434,7 @@ private:
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
std::unique_ptr<ThreadPool> _single_replica_compaction_thread_pool;
+
std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
diff --git a/be/src/pipeline/pipeline_tracing.cpp
b/be/src/pipeline/pipeline_tracing.cpp
index 047e3c3a01d..d9029951682 100644
--- a/be/src/pipeline/pipeline_tracing.cpp
+++ b/be/src/pipeline/pipeline_tracing.cpp
@@ -116,7 +116,6 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id)
{
THROW_IF_ERROR(writer.appendv(&text, 1));
}
- THROW_IF_ERROR(writer.finalize());
THROW_IF_ERROR(writer.close());
_last_dump_time = MonotonicSeconds();
@@ -156,7 +155,6 @@ void PipelineTracerContext::_dump_timeslice() {
THROW_IF_ERROR(writer.appendv(&text, 1));
}
}
- THROW_IF_ERROR(writer.finalize());
THROW_IF_ERROR(writer.close());
_last_dump_time = MonotonicSeconds();
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0f6aa71c974..24877103384 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -197,6 +197,7 @@ public:
ThreadPool* send_report_thread_pool() { return
_send_report_thread_pool.get(); }
ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get();
}
ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get();
}
+ ThreadPool* non_block_close_thread_pool();
Status init_pipeline_task_scheduler();
void init_file_cache_factory();
@@ -368,6 +369,7 @@ private:
std::unique_ptr<ThreadPool> _join_node_thread_pool;
// Pool to use a new thread to release object
std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
+ std::unique_ptr<ThreadPool> _non_block_close_thread_pool;
FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index ba0473ee10c..8d654c8d09b 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -111,6 +111,23 @@
#include "runtime/memory/tcmalloc_hook.h"
#endif
+// Used for unit test
+namespace {
+std::once_flag flag;
+std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool;
+void init_threadpool_for_test() {
+ static_cast<void>(doris::ThreadPoolBuilder("NonBlockCloseThreadPool")
+ .set_min_threads(12)
+ .set_max_threads(48)
+ .build(&non_block_close_thread_pool));
+}
+
+[[maybe_unused]] doris::ThreadPool* get_non_block_close_thread_pool() {
+ std::call_once(flag, init_threadpool_for_test);
+ return non_block_close_thread_pool.get();
+}
+} // namespace
+
namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;
@@ -153,6 +170,14 @@ static pair<size_t, size_t> get_num_threads(size_t
min_num, size_t max_num) {
return {min_num, max_num};
}
+ThreadPool* ExecEnv::non_block_close_thread_pool() {
+#ifdef BE_TEST
+ return get_non_block_close_thread_pool();
+#else
+ return _non_block_close_thread_pool.get();
+#endif
+}
+
Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
const std::vector<StorePath>& spill_store_paths,
const std::set<std::string>& broken_paths) {
@@ -235,6 +260,10 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_max_threads(1)
.set_max_queue_size(1000000)
.build(&_lazy_release_obj_pool));
+ static_cast<void>(ThreadPoolBuilder("NonBlockCloseThreadPool")
+
.set_min_threads(config::min_nonblock_close_thread_num)
+
.set_max_threads(config::max_nonblock_close_thread_num)
+ .build(&_non_block_close_thread_pool));
// NOTE: runtime query statistics mgr could be visited by query and daemon
thread
// so it should be created before all query begin and deleted after all
query and daemon thread stoppped
@@ -639,6 +668,7 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
SAFE_SHUTDOWN(_join_node_thread_pool);
SAFE_SHUTDOWN(_lazy_release_obj_pool);
+ SAFE_SHUTDOWN(_non_block_close_thread_pool);
SAFE_SHUTDOWN(_send_report_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);
@@ -684,6 +714,7 @@ void ExecEnv::destroy() {
// TODO(zhiqiang): Maybe we should call shutdown before release thread
pool?
_join_node_thread_pool.reset(nullptr);
_lazy_release_obj_pool.reset(nullptr);
+ _non_block_close_thread_pool.reset(nullptr);
_send_report_thread_pool.reset(nullptr);
_send_table_stats_thread_pool.reset(nullptr);
_buffered_reader_prefetch_thread_pool.reset(nullptr);
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index 535fbf772c9..32503bbfdb7 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -176,7 +176,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const
SegmentStatistics& st
if (file_writer == nullptr) {
return Status::Corruption("add_segment failed, file writer {} is
destoryed", segid);
}
- if (!file_writer->closed()) {
+ if (file_writer->closed() != io::FileWriterState::CLOSED) {
return Status::Corruption("add_segment failed, segment {} is not
closed",
file_writer->path().native());
}
@@ -209,7 +209,7 @@ Status LoadStreamWriter::close() {
}
for (const auto& writer : _segment_file_writers) {
- if (!writer->closed()) {
+ if (writer->closed() != io::FileWriterState::CLOSED) {
return Status::Corruption("LoadStreamWriter close failed, segment
{} is not closed",
writer->path().native());
}
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 362df33aa28..919680f1c16 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -136,7 +136,6 @@ public:
}
std::string data(1, '0');
ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok());
- ASSERT_TRUE(writer->finalize().ok());
ASSERT_TRUE(writer->close().ok());
}
ExecEnv::GetInstance()->_file_cache_factory = factory.get();
diff --git a/be/test/io/fs/hdfs_file_system_test.cpp
b/be/test/io/fs/hdfs_file_system_test.cpp
index 87fe5d52dd8..41c26ccedf7 100644
--- a/be/test/io/fs/hdfs_file_system_test.cpp
+++ b/be/test/io/fs/hdfs_file_system_test.cpp
@@ -110,9 +110,6 @@ TEST(HdfsFileSystemTest, Write) {
st = hdfs_file_writer->append(content_2M);
ASSERT_TRUE(st.ok()) << st;
- st = hdfs_file_writer->finalize();
- ASSERT_TRUE(st.ok()) << st;
-
st = hdfs_file_writer->close();
ASSERT_TRUE(st.ok()) << st;
diff --git a/be/test/io/fs/local_file_system_test.cpp
b/be/test/io/fs/local_file_system_test.cpp
index 556fb1adecc..a264ca799dd 100644
--- a/be/test/io/fs/local_file_system_test.cpp
+++ b/be/test/io/fs/local_file_system_test.cpp
@@ -91,8 +91,6 @@ TEST_F(LocalFileSystemTest, WriteRead) {
Slice slices[2] {abc, bcd};
st = file_writer->appendv(slices, 2);
ASSERT_TRUE(st.ok()) << st;
- st = file_writer->finalize();
- ASSERT_TRUE(st.ok()) << st;
st = file_writer->close();
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(file_writer->bytes_appended(), 115);
@@ -143,8 +141,6 @@ TEST_F(LocalFileSystemTest, Exist) {
io::FileWriterPtr file_writer;
auto st = io::global_local_filesystem()->create_file(fname, &file_writer);
ASSERT_TRUE(st.ok()) << st;
- st = file_writer->finalize();
- ASSERT_TRUE(st.ok()) << st;
st = file_writer->close();
ASSERT_TRUE(st.ok()) << st;
ASSERT_TRUE(check_exist(fname));
@@ -155,8 +151,6 @@ TEST_F(LocalFileSystemTest, List) {
auto fname = fmt::format("{}/abc", test_dir);
auto st = io::global_local_filesystem()->create_file(fname, &file_writer);
ASSERT_TRUE(st.ok()) << st;
- st = file_writer->finalize();
- ASSERT_TRUE(st.ok()) << st;
st = file_writer->close();
ASSERT_TRUE(st.ok()) << st;
ASSERT_TRUE(check_exist(fname));
diff --git a/be/test/io/fs/s3_file_writer_test.cpp
b/be/test/io/fs/s3_file_writer_test.cpp
index 7d7d71ab318..9e0923d7d53 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -123,7 +123,6 @@ TEST_F(S3FileWriterTest, multi_part_io_error) {
offset += bytes_read;
}
ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
- ASSERT_TRUE(!s3_file_writer->finalize().ok());
// The second part would fail uploading itself to s3
// so the result of close should be not ok
ASSERT_TRUE(!s3_file_writer->close().ok());
@@ -166,7 +165,6 @@ TEST_F(S3FileWriterTest, put_object_io_error) {
offset += bytes_read;
}
ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
- ASSERT_TRUE(!s3_file_writer->finalize().ok());
// The object might be timeout but still succeed in loading
ASSERT_TRUE(!s3_file_writer->close().ok());
}
@@ -266,7 +264,6 @@ TEST_F(S3FileWriterTest, normal) {
offset += bytes_read;
}
ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
- ASSERT_TRUE(s3_file_writer->finalize().ok());
ASSERT_EQ(Status::OK(), s3_file_writer->close());
int64_t s3_file_size = 0;
ASSERT_EQ(Status::OK(), s3_fs->file_size("normal", &s3_file_size));
@@ -299,7 +296,6 @@ TEST_F(S3FileWriterTest, smallFile) {
offset += bytes_read;
}
ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
- ASSERT_TRUE(s3_file_writer->finalize().ok());
ASSERT_EQ(Status::OK(), s3_file_writer->close());
int64_t s3_file_size = 0;
ASSERT_EQ(Status::OK(), s3_fs->file_size("small", &s3_file_size));
@@ -359,7 +355,6 @@ TEST_F(S3FileWriterTest, finalize_error) {
offset += bytes_read;
}
ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
- ASSERT_TRUE(!s3_file_writer->finalize().ok());
bool exits = false;
static_cast<void>(s3_fs->exists("finalize_error", &exits));
ASSERT_TRUE(!exits);
@@ -396,7 +391,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_2) {
offset += bytes_read;
}
ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
- ASSERT_TRUE(s3_file_writer->finalize().ok());
// The second part would fail uploading itself to s3
// so the result of close should be not ok
auto st = s3_file_writer->close();
@@ -435,7 +429,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_1) {
offset += bytes_read;
}
ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
- ASSERT_TRUE(s3_file_writer->finalize().ok());
// The second part would fail uploading itself to s3
// so the result of close should be not ok
auto st = s3_file_writer->close();
@@ -474,7 +467,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_3) {
offset += bytes_read;
}
ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
- ASSERT_TRUE(s3_file_writer->finalize().ok());
// The second part would fail uploading itself to s3
// so the result of close should be not ok
auto st = s3_file_writer->close();
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index ad6e496c56f..b9b0e0818cf 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -106,7 +106,7 @@ TEST_F(StreamSinkFileWriterTest, Test) {
CHECK_STATUS_OK(writer.appendv(&(*slices.begin()), slices.size()));
EXPECT_EQ(NUM_STREAM, g_num_request);
- CHECK_STATUS_OK(writer.finalize());
+ CHECK_STATUS_OK(writer.close());
EXPECT_EQ(NUM_STREAM * 2, g_num_request);
}
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index 49de1826104..953be73862d 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -99,15 +99,13 @@ public:
~FileWriterMock() override = default;
- Status close() override { return _local_file_writer->close(); }
+ Status close(bool /*non_block*/) override { return
_local_file_writer->close(); }
Status appendv(const Slice* data, size_t data_cnt) override {
return _local_file_writer->appendv(data, data_cnt);
}
- Status finalize() override { return _local_file_writer->finalize(); }
-
- bool closed() const override { return _local_file_writer->closed(); }
+ io::FileWriterState closed() const override { return
_local_file_writer->closed(); }
size_t bytes_appended() const override { return
_local_file_writer->bytes_appended(); }
diff --git
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
index 759c409a850..4a7e3d45e90 100644
---
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
@@ -103,13 +103,6 @@ suite("test_index_compound_directory_failure_injection",
"nonConcurrent") {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
}
qt_sql "select COUNT() from ${testTable_dup} where request match
'images'"
- try {
-
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
- load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
- } finally {
-
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
- }
- qt_sql "select COUNT() from ${testTable_dup} where request match
'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]