This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d8d3e72534 [enhance](S3FIleWriter) Add md5 check for small file not
suitable for multi part upload (#22296)
d8d3e72534 is described below
commit d8d3e72534ddac68b371d88d42fe24ffe0a8123f
Author: AlexYue <[email protected]>
AuthorDate: Thu Aug 10 18:11:07 2023 +0800
[enhance](S3FIleWriter) Add md5 check for small file not suitable for multi
part upload (#22296)
---
be/src/io/fs/s3_file_write_bufferpool.h | 2 +-
be/src/io/fs/s3_file_writer.cpp | 30 +++++++++++++--------
be/src/io/fs/s3_file_writer.h | 46 ++++-----------------------------
3 files changed, 25 insertions(+), 53 deletions(-)
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h
b/be/src/io/fs/s3_file_write_bufferpool.h
index 55fa53df42..f87a78289f 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -71,7 +71,7 @@ struct S3FileBuffer : public
std::enable_shared_from_this<S3FileBuffer> {
// get the size of the content already appendded
size_t get_size() const { return _size; }
// get the underlying stream containing
- std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; }
+ const std::shared_ptr<std::iostream>& get_stream() const { return
_stream_ptr; }
// get file offset corresponding to the buffer
size_t get_file_offset() const { return _offset; }
// set the offset of the buffer
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 6b7bd6ad8e..519470ebae 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -123,11 +123,19 @@ Status S3FileWriter::_create_multi_upload_request() {
_bucket, _path.native(), _upload_id,
outcome.GetError().GetMessage());
}
-void S3FileWriter::_wait_until_finish(std::string task_name) {
+void S3FileWriter::_wait_until_finish(std::string_view task_name) {
auto msg =
fmt::format("{} multipart upload already takes 5 min, bucket={},
key={}, upload_id={}",
- std::move(task_name), _bucket, _path.native(),
_upload_id);
- while (!_wait.wait()) {
+ task_name, _bucket, _path.native(), _upload_id);
+ timespec current_time;
+ // We don't need high accuracy here, so we use time(nullptr)
+ // since it's the fastest way to get current time(second)
+ auto current_time_second = time(nullptr);
+ current_time.tv_sec = current_time_second + 300;
+ current_time.tv_nsec = 0;
+ // bthread::countdown_event::timed_wait() should use absolute time
+ while (0 != _countdown_event.timed_wait(current_time)) {
+ current_time.tv_sec += 300;
LOG(WARNING) << msg;
}
}
@@ -184,7 +192,7 @@ Status S3FileWriter::close() {
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
}
- _wait.add();
+ _countdown_event.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
@@ -212,7 +220,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
});
_pending_buf->set_file_offset(_bytes_appended);
// later we might need to wait all prior tasks to be finished
- _pending_buf->set_finish_upload([this]() { _wait.done(); });
+ _pending_buf->set_finish_upload([this]() {
_countdown_event.signal(); });
_pending_buf->set_is_cancel([this]() { return _failed.load();
});
_pending_buf->set_on_failed([this, part_num =
_cur_part_num](Status st) {
VLOG_NOTICE << "failed at key: " << _key << ", load part "
<< part_num
@@ -241,7 +249,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
RETURN_IF_ERROR(_create_multi_upload_request());
}
_cur_part_num++;
- _wait.add();
+ _countdown_event.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
@@ -259,11 +267,9 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
S3FileBuffer& buf) {
upload_request.WithBucket(_bucket).WithKey(_key).WithPartNumber(part_num).WithUploadId(
_upload_id);
- auto _stream_ptr = buf.get_stream();
-
upload_request.SetBody(buf.get_stream());
- Aws::Utils::ByteBuffer
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*_stream_ptr));
+ Aws::Utils::ByteBuffer
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream()));
upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
upload_request.SetContentLength(buf.get_size());
@@ -286,7 +292,7 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
S3FileBuffer& buf) {
std::unique_ptr<CompletedPart> completed_part =
std::make_unique<CompletedPart>();
completed_part->SetPartNumber(part_num);
- auto etag = upload_part_outcome.GetResult().GetETag();
+ const auto& etag = upload_part_outcome.GetResult().GetETag();
// DCHECK(etag.empty());
completed_part->SetETag(etag);
@@ -345,7 +351,7 @@ Status S3FileWriter::finalize() {
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
}
- _wait.add();
+ _countdown_event.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
@@ -357,6 +363,8 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
DCHECK(!_closed) << "closed " << _closed;
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(_bucket).WithKey(_key);
+ Aws::Utils::ByteBuffer
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream()));
+ request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
request.SetBody(buf.get_stream());
request.SetContentLength(buf.get_size());
request.SetContentType("application/octet-stream");
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index d8956da88a..2c139242ed 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -18,6 +18,7 @@
#pragma once
#include <aws/core/utils/memory/stl/AWSStringStream.h>
+#include <bthread/countdown_event.h>
#include <cstddef>
#include <list>
@@ -57,48 +58,10 @@ public:
return Status::NotSupported("not support");
}
- int64_t upload_cost_ms() const { return *_upload_cost_ms; }
+ [[nodiscard]] int64_t upload_cost_ms() const { return *_upload_cost_ms; }
private:
- class WaitGroup {
- public:
- WaitGroup() = default;
-
- ~WaitGroup() = default;
-
- WaitGroup(const WaitGroup&) = delete;
- WaitGroup(WaitGroup&&) = delete;
- void operator=(const WaitGroup&) = delete;
- void operator=(WaitGroup&&) = delete;
- // add one counter indicating one more concurrent worker
- void add(int count = 1) { _count += count; }
-
- // decrease count if one concurrent worker finished it's work
- void done() {
- _count--;
- if (_count.load() <= 0) {
- _cv.notify_all();
- }
- }
-
- // wait for all concurrent workers finish their work and return true
- // would return false if timeout, default timeout would be 5min
- bool wait(int64_t timeout_seconds = 300) {
- if (_count.load() <= 0) {
- return true;
- }
- std::unique_lock<std::mutex> lck {_lock};
- _cv.wait_for(lck, std::chrono::seconds(timeout_seconds),
- [this]() { return _count.load() <= 0; });
- return _count.load() <= 0;
- }
-
- private:
- std::mutex _lock;
- std::condition_variable _cv;
- std::atomic_int64_t _count {0};
- };
- void _wait_until_finish(std::string task_name);
+ void _wait_until_finish(std::string_view task_name);
Status _complete();
Status _create_multi_upload_request();
void _put_object(S3FileBuffer& buf);
@@ -119,7 +82,8 @@ private:
std::mutex _completed_lock;
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>
_completed_parts;
- WaitGroup _wait;
+ // **Attention** call add_count() before submitting buf to async thread
pool
+ bthread::CountdownEvent _countdown_event {0};
std::atomic_bool _failed = false;
Status _st = Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]