This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5911567826e branch-3.0: [fix](S3FileWriter) Fix boundary issue when 
multipart upload (#43254)
5911567826e is described below

commit 5911567826ef91e0e431966d268371ed1545bcbf
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 7 11:15:15 2024 +0800

    branch-3.0: [fix](S3FileWriter) Fix boundary issue when multipart upload 
(#43254)
    
    Cherry-picked from #43037
    
    Co-authored-by: Gavin Chou <[email protected]>
---
 be/src/io/fs/s3_file_writer.cpp | 54 +++++++++++++++++++++++------------------
 1 file changed, 31 insertions(+), 23 deletions(-)

diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 24b72a4b6c9..e40b9e171eb 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -204,12 +204,12 @@ Status S3FileWriter::_build_upload_buffer() {
 Status S3FileWriter::_close_impl() {
     VLOG_DEBUG << "S3FileWriter::close, path: " << 
_obj_storage_path_opts.path.native();
 
-    if (_cur_part_num == 1 && _pending_buf) {
+    if (_cur_part_num == 1 && _pending_buf) { // data size is less than 
config::s3_write_buffer_size
         RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
     }
 
     if (_bytes_appended == 0) {
-        DCHECK(_cur_part_num == 1);
+        DCHECK_EQ(_cur_part_num, 1);
         // No data written, but need to create an empty file
         RETURN_IF_ERROR(_build_upload_buffer());
         if (!_used_by_s3_committer) {
@@ -220,10 +220,15 @@ Status S3FileWriter::_close_impl() {
         }
     }
 
-    if (_pending_buf != nullptr) {
+    if (_pending_buf != nullptr) { // there is remaining data in buffer need 
to be uploaded
         _countdown_event.add_count();
         RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
         _pending_buf = nullptr;
+    } else if (_bytes_appended != 0) { // Non-empty file and has nothing to be 
uploaded
+        // NOTE: When the data size is a multiple of 
config::s3_write_buffer_size,
+        //       _cur_part_num may exceed the actual number of parts that need 
to be uploaded.
+        //       This is because it is incremented by 1 in advance within the 
S3FileWriter::appendv method.
+        _cur_part_num--;
     }
 
     RETURN_IF_ERROR(_complete());
@@ -327,26 +332,29 @@ Status S3FileWriter::_complete() {
     _wait_until_finish("Complete");
     TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
                              std::make_pair(&_failed, &_completed_parts));
-    if (!_used_by_s3_committer) { // S3 committer will complete multipart 
upload file on FE side.
-        if (_failed || _completed_parts.size() != _cur_part_num) {
-            _st = Status::InternalError(
-                    "error status {}, have failed {}, complete parts {}, cur 
part num {}, whole "
-                    "parts {}, file path {}, file size {}, has left buffer {}",
-                    _st, _failed, _completed_parts.size(), _cur_part_num, 
_dump_completed_part(),
-                    _obj_storage_path_opts.path.native(), _bytes_appended, 
_pending_buf != nullptr);
-            LOG(WARNING) << _st;
-            return _st;
-        }
-        // make sure _completed_parts are ascending order
-        std::sort(_completed_parts.begin(), _completed_parts.end(),
-                  [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; 
});
-        TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", 
&_completed_parts);
-        auto resp = client->complete_multipart_upload(_obj_storage_path_opts, 
_completed_parts);
-        if (resp.status.code != ErrorCode::OK) {
-            LOG_WARNING("Compltet multi part upload failed because {}, file 
path {}",
-                        resp.status.msg, _obj_storage_path_opts.path.native());
-            return {resp.status.code, std::move(resp.status.msg)};
-        }
+    if (_used_by_s3_committer) {    // S3 committer will complete multipart 
upload file on FE side.
+        s3_file_created_total << 1; // Assume that it will be created 
successfully
+        return Status::OK();
+    }
+
+    if (_failed || _completed_parts.size() != _cur_part_num) {
+        _st = Status::InternalError(
+                "error status={} failed={} #complete_parts={} 
#expected_parts={} "
+                "completed_parts_list={} file_path={} file_size={} has left 
buffer not uploaded={}",
+                _st, _failed, _completed_parts.size(), _cur_part_num, 
_dump_completed_part(),
+                _obj_storage_path_opts.path.native(), _bytes_appended, 
_pending_buf != nullptr);
+        LOG(WARNING) << _st;
+        return _st;
+    }
+    // make sure _completed_parts are ascending order
+    std::sort(_completed_parts.begin(), _completed_parts.end(),
+              [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
+    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
+    auto resp = client->complete_multipart_upload(_obj_storage_path_opts, 
_completed_parts);
+    if (resp.status.code != ErrorCode::OK) {
+        LOG_WARNING("Compltet multi part upload failed because {}, file path 
{}", resp.status.msg,
+                    _obj_storage_path_opts.path.native());
+        return {resp.status.code, std::move(resp.status.msg)};
     }
     s3_file_created_total << 1;
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to