This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d3e72534 [enhance](S3FIleWriter) Add md5 check for small file not 
suitable for multi part upload (#22296)
d8d3e72534 is described below

commit d8d3e72534ddac68b371d88d42fe24ffe0a8123f
Author: AlexYue <[email protected]>
AuthorDate: Thu Aug 10 18:11:07 2023 +0800

    [enhance](S3FIleWriter) Add md5 check for small file not suitable for multi 
part upload (#22296)
---
 be/src/io/fs/s3_file_write_bufferpool.h |  2 +-
 be/src/io/fs/s3_file_writer.cpp         | 30 +++++++++++++--------
 be/src/io/fs/s3_file_writer.h           | 46 ++++-----------------------------
 3 files changed, 25 insertions(+), 53 deletions(-)

diff --git a/be/src/io/fs/s3_file_write_bufferpool.h 
b/be/src/io/fs/s3_file_write_bufferpool.h
index 55fa53df42..f87a78289f 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -71,7 +71,7 @@ struct S3FileBuffer : public 
std::enable_shared_from_this<S3FileBuffer> {
     // get the size of the content already appendded
     size_t get_size() const { return _size; }
     // get the underlying stream containing
-    std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; }
+    const std::shared_ptr<std::iostream>& get_stream() const { return 
_stream_ptr; }
     // get file offset corresponding to the buffer
     size_t get_file_offset() const { return _offset; }
     // set the offset of the buffer
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 6b7bd6ad8e..519470ebae 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -123,11 +123,19 @@ Status S3FileWriter::_create_multi_upload_request() {
                            _bucket, _path.native(), _upload_id, 
outcome.GetError().GetMessage());
 }
 
-void S3FileWriter::_wait_until_finish(std::string task_name) {
+void S3FileWriter::_wait_until_finish(std::string_view task_name) {
     auto msg =
             fmt::format("{} multipart upload already takes 5 min, bucket={}, 
key={}, upload_id={}",
-                        std::move(task_name), _bucket, _path.native(), 
_upload_id);
-    while (!_wait.wait()) {
+                        task_name, _bucket, _path.native(), _upload_id);
+    timespec current_time;
+    // We don't need high accuracy here, so we use time(nullptr)
+    // since it's the fastest way to get current time(second)
+    auto current_time_second = time(nullptr);
+    current_time.tv_sec = current_time_second + 300;
+    current_time.tv_nsec = 0;
+    // bthread::countdown_event::timed_wait() should use absolute time
+    while (0 != _countdown_event.timed_wait(current_time)) {
+        current_time.tv_sec += 300;
         LOG(WARNING) << msg;
     }
 }
@@ -184,7 +192,7 @@ Status S3FileWriter::close() {
             _pending_buf->set_upload_remote_callback(
                     [this, buf = _pending_buf]() { _put_object(*buf); });
         }
-        _wait.add();
+        _countdown_event.add_count();
         _pending_buf->submit();
         _pending_buf = nullptr;
     }
@@ -212,7 +220,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                         });
                 _pending_buf->set_file_offset(_bytes_appended);
                 // later we might need to wait all prior tasks to be finished
-                _pending_buf->set_finish_upload([this]() { _wait.done(); });
+                _pending_buf->set_finish_upload([this]() { 
_countdown_event.signal(); });
                 _pending_buf->set_is_cancel([this]() { return _failed.load(); 
});
                 _pending_buf->set_on_failed([this, part_num = 
_cur_part_num](Status st) {
                     VLOG_NOTICE << "failed at key: " << _key << ", load part " 
<< part_num
@@ -241,7 +249,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                     RETURN_IF_ERROR(_create_multi_upload_request());
                 }
                 _cur_part_num++;
-                _wait.add();
+                _countdown_event.add_count();
                 _pending_buf->submit();
                 _pending_buf = nullptr;
             }
@@ -259,11 +267,9 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
S3FileBuffer& buf) {
     
upload_request.WithBucket(_bucket).WithKey(_key).WithPartNumber(part_num).WithUploadId(
             _upload_id);
 
-    auto _stream_ptr = buf.get_stream();
-
     upload_request.SetBody(buf.get_stream());
 
-    Aws::Utils::ByteBuffer 
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*_stream_ptr));
+    Aws::Utils::ByteBuffer 
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream()));
     
upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
 
     upload_request.SetContentLength(buf.get_size());
@@ -286,7 +292,7 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
S3FileBuffer& buf) {
 
     std::unique_ptr<CompletedPart> completed_part = 
std::make_unique<CompletedPart>();
     completed_part->SetPartNumber(part_num);
-    auto etag = upload_part_outcome.GetResult().GetETag();
+    const auto& etag = upload_part_outcome.GetResult().GetETag();
     // DCHECK(etag.empty());
     completed_part->SetETag(etag);
 
@@ -345,7 +351,7 @@ Status S3FileWriter::finalize() {
             _pending_buf->set_upload_remote_callback(
                     [this, buf = _pending_buf]() { _put_object(*buf); });
         }
-        _wait.add();
+        _countdown_event.add_count();
         _pending_buf->submit();
         _pending_buf = nullptr;
     }
@@ -357,6 +363,8 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
     DCHECK(!_closed) << "closed " << _closed;
     Aws::S3::Model::PutObjectRequest request;
     request.WithBucket(_bucket).WithKey(_key);
+    Aws::Utils::ByteBuffer 
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream()));
+    request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
     request.SetBody(buf.get_stream());
     request.SetContentLength(buf.get_size());
     request.SetContentType("application/octet-stream");
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index d8956da88a..2c139242ed 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <aws/core/utils/memory/stl/AWSStringStream.h>
+#include <bthread/countdown_event.h>
 
 #include <cstddef>
 #include <list>
@@ -57,48 +58,10 @@ public:
         return Status::NotSupported("not support");
     }
 
-    int64_t upload_cost_ms() const { return *_upload_cost_ms; }
+    [[nodiscard]] int64_t upload_cost_ms() const { return *_upload_cost_ms; }
 
 private:
-    class WaitGroup {
-    public:
-        WaitGroup() = default;
-
-        ~WaitGroup() = default;
-
-        WaitGroup(const WaitGroup&) = delete;
-        WaitGroup(WaitGroup&&) = delete;
-        void operator=(const WaitGroup&) = delete;
-        void operator=(WaitGroup&&) = delete;
-        // add one counter indicating one more concurrent worker
-        void add(int count = 1) { _count += count; }
-
-        // decrease count if one concurrent worker finished it's work
-        void done() {
-            _count--;
-            if (_count.load() <= 0) {
-                _cv.notify_all();
-            }
-        }
-
-        // wait for all concurrent workers finish their work and return true
-        // would return false if timeout, default timeout would be 5min
-        bool wait(int64_t timeout_seconds = 300) {
-            if (_count.load() <= 0) {
-                return true;
-            }
-            std::unique_lock<std::mutex> lck {_lock};
-            _cv.wait_for(lck, std::chrono::seconds(timeout_seconds),
-                         [this]() { return _count.load() <= 0; });
-            return _count.load() <= 0;
-        }
-
-    private:
-        std::mutex _lock;
-        std::condition_variable _cv;
-        std::atomic_int64_t _count {0};
-    };
-    void _wait_until_finish(std::string task_name);
+    void _wait_until_finish(std::string_view task_name);
     Status _complete();
     Status _create_multi_upload_request();
     void _put_object(S3FileBuffer& buf);
@@ -119,7 +82,8 @@ private:
     std::mutex _completed_lock;
     std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> 
_completed_parts;
 
-    WaitGroup _wait;
+    // **Attention** call add_count() before submitting buf to async thread 
pool
+    bthread::CountdownEvent _countdown_event {0};
 
     std::atomic_bool _failed = false;
     Status _st = Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to