OliLay commented on code in PR #41564:
URL: https://github.com/apache/arrow/pull/41564#discussion_r1601474966
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1768,40 +1846,77 @@ class ObjectOutputStream final : public
io::OutputStream {
}
// Wait for background writes to finish
std::unique_lock<std::mutex> lock(upload_state_->mutex);
- return upload_state_->pending_parts_completed;
+ return upload_state_->pending_uploads_completed;
}
// Upload-related helpers
Status CommitCurrentPart() {
+ if (!is_multipart_created_) {
+ RETURN_NOT_OK(CreateMultipartUpload());
+ }
+
ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
current_part_.reset();
current_part_size_ = 0;
return UploadPart(buf);
}
- Status UploadPart(std::shared_ptr<Buffer> buffer) {
- return UploadPart(buffer->data(), buffer->size(), buffer);
+ Status UploadUsingSingleRequest() {
+ std::shared_ptr<Buffer> buf;
Review Comment:
We would need another variable for that to save that we issued the single
part upload. Considering this, not sure if is worth adding that, considering
that we only issue a single part upload in `Close()` anyways. (+ tests would
complain that the object already exists if we would trigger both requests,
independent in which order I would assume)
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1838,9 +1952,114 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
- int part_number, const
S3Model::UploadPartRequest& req,
- const Result<S3Model::UploadPartOutcome>&
result) {
+ static Status UploadError(const Aws::S3::Model::PutObjectRequest& request,
+ const Aws::S3::Model::PutObjectOutcome& outcome) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When uploading object with key '",
request.GetKey(),
+ "' in bucket '", request.GetBucket(), "': "),
+ "PutObject", outcome.GetError());
+ }
+
+ Status UploadUsingSingleRequest(std::shared_ptr<Buffer> buffer) {
+ return UploadUsingSingleRequest(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status UploadUsingSingleRequest(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer =
nullptr) {
+ auto sync_result_callback = [](const Aws::S3::Model::PutObjectRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::PutObjectOutcome outcome) {
+ if (!outcome.IsSuccess()) {
+ return UploadError(request, outcome);
+ }
+ return Status::OK();
+ };
+
+ auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest&
request,
Review Comment:
Yes, previously in the sync case we also directly called `AddCompletedPart`
which just used the `state` to fiddle around with its `completed_parts` member.
For a single upload request, I think we can omit this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]