This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5911567826e branch-3.0: [fix](S3FileWriter) Fix boundary issue when
multipart upload (#43254)
5911567826e is described below
commit 5911567826ef91e0e431966d268371ed1545bcbf
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 7 11:15:15 2024 +0800
branch-3.0: [fix](S3FileWriter) Fix boundary issue when multipart upload
(#43254)
Cherry-picked from #43037
Co-authored-by: Gavin Chou <[email protected]>
---
be/src/io/fs/s3_file_writer.cpp | 54 +++++++++++++++++++++++------------------
1 file changed, 31 insertions(+), 23 deletions(-)
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 24b72a4b6c9..e40b9e171eb 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -204,12 +204,12 @@ Status S3FileWriter::_build_upload_buffer() {
Status S3FileWriter::_close_impl() {
VLOG_DEBUG << "S3FileWriter::close, path: " <<
_obj_storage_path_opts.path.native();
- if (_cur_part_num == 1 && _pending_buf) {
+ if (_cur_part_num == 1 && _pending_buf) { // data size is less than
config::s3_write_buffer_size
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
if (_bytes_appended == 0) {
- DCHECK(_cur_part_num == 1);
+ DCHECK_EQ(_cur_part_num, 1);
// No data written, but need to create an empty file
RETURN_IF_ERROR(_build_upload_buffer());
if (!_used_by_s3_committer) {
@@ -220,10 +220,15 @@ Status S3FileWriter::_close_impl() {
}
}
- if (_pending_buf != nullptr) {
+ if (_pending_buf != nullptr) { // there is remaining data in buffer need
to be uploaded
_countdown_event.add_count();
RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
_pending_buf = nullptr;
+ } else if (_bytes_appended != 0) { // Non-empty file and has nothing to be
uploaded
+ // NOTE: When the data size is a multiple of
config::s3_write_buffer_size,
+ // _cur_part_num may exceed the actual number of parts that need
to be uploaded.
+ // This is because it is incremented by 1 in advance within the
S3FileWriter::appendv method.
+ _cur_part_num--;
}
RETURN_IF_ERROR(_complete());
@@ -327,26 +332,29 @@ Status S3FileWriter::_complete() {
_wait_until_finish("Complete");
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
std::make_pair(&_failed, &_completed_parts));
- if (!_used_by_s3_committer) { // S3 committer will complete multipart
upload file on FE side.
- if (_failed || _completed_parts.size() != _cur_part_num) {
- _st = Status::InternalError(
- "error status {}, have failed {}, complete parts {}, cur
part num {}, whole "
- "parts {}, file path {}, file size {}, has left buffer {}",
- _st, _failed, _completed_parts.size(), _cur_part_num,
_dump_completed_part(),
- _obj_storage_path_opts.path.native(), _bytes_appended,
_pending_buf != nullptr);
- LOG(WARNING) << _st;
- return _st;
- }
- // make sure _completed_parts are ascending order
- std::sort(_completed_parts.begin(), _completed_parts.end(),
- [](auto& p1, auto& p2) { return p1.part_num < p2.part_num;
});
- TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2",
&_completed_parts);
- auto resp = client->complete_multipart_upload(_obj_storage_path_opts,
_completed_parts);
- if (resp.status.code != ErrorCode::OK) {
- LOG_WARNING("Compltet multi part upload failed because {}, file
path {}",
- resp.status.msg, _obj_storage_path_opts.path.native());
- return {resp.status.code, std::move(resp.status.msg)};
- }
+ if (_used_by_s3_committer) { // S3 committer will complete multipart
upload file on FE side.
+ s3_file_created_total << 1; // Assume that it will be created
successfully
+ return Status::OK();
+ }
+
+ if (_failed || _completed_parts.size() != _cur_part_num) {
+ _st = Status::InternalError(
+ "error status={} failed={} #complete_parts={}
#expected_parts={} "
+ "completed_parts_list={} file_path={} file_size={} has left
buffer not uploaded={}",
+ _st, _failed, _completed_parts.size(), _cur_part_num,
_dump_completed_part(),
+ _obj_storage_path_opts.path.native(), _bytes_appended,
_pending_buf != nullptr);
+ LOG(WARNING) << _st;
+ return _st;
+ }
+ // make sure _completed_parts are ascending order
+ std::sort(_completed_parts.begin(), _completed_parts.end(),
+ [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
+ TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
+ auto resp = client->complete_multipart_upload(_obj_storage_path_opts,
_completed_parts);
+ if (resp.status.code != ErrorCode::OK) {
+ LOG_WARNING("Compltet multi part upload failed because {}, file path
{}", resp.status.msg,
+ _obj_storage_path_opts.path.native());
+ return {resp.status.code, std::move(resp.status.msg)};
}
s3_file_created_total << 1;
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]