mapleFU commented on code in PR #41564:
URL: https://github.com/apache/arrow/pull/41564#discussion_r1601154419
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1597,6 +1632,21 @@ class ObjectOutputStream final : public io::OutputStream
{
}
upload_id_ = outcome.GetResult().GetUploadId();
upload_state_ = std::make_shared<UploadState>();
Review Comment:
Would this create duplicate `upload_state_` if shifting from single-part req?
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1606,42 +1656,60 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+ if (is_multipart_created_) {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
- S3Model::AbortMultipartUploadRequest req;
- req.SetBucket(ToAwsString(path_.bucket));
- req.SetKey(ToAwsString(path_.key));
- req.SetUploadId(upload_id_);
+ S3Model::AbortMultipartUploadRequest req;
+ req.SetBucket(ToAwsString(path_.bucket));
+ req.SetKey(ToAwsString(path_.key));
+ req.SetUploadId(upload_id_);
- auto outcome = client_lock.Move()->AbortMultipartUpload(req);
- if (!outcome.IsSuccess()) {
- return ErrorToStatus(
- std::forward_as_tuple("When aborting multiple part upload for key
'", path_.key,
- "' in bucket '", path_.bucket, "': "),
- "AbortMultipartUpload", outcome.GetError());
+ auto outcome = client_lock.Move()->AbortMultipartUpload(req);
+ if (!outcome.IsSuccess()) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When aborting multiple part upload for key
'",
+ path_.key, "' in bucket '", path_.bucket,
"': "),
+ "AbortMultipartUpload", outcome.GetError());
+ }
}
+
current_part_.reset();
holder_ = nullptr;
closed_ = true;
+
return Status::OK();
}
// OutputStream interface
+ bool ShouldBeMultipartUpload() { return pos_ >
kMultiPartUploadThresholdSize; }
+
+ bool IsMultipartUpload() { return ShouldBeMultipartUpload() ||
is_multipart_created_; }
Review Comment:
1. Why two functions are added here?
2. Should they be const fn here?
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1606,42 +1656,60 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+ if (is_multipart_created_) {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
- S3Model::AbortMultipartUploadRequest req;
- req.SetBucket(ToAwsString(path_.bucket));
- req.SetKey(ToAwsString(path_.key));
- req.SetUploadId(upload_id_);
+ S3Model::AbortMultipartUploadRequest req;
+ req.SetBucket(ToAwsString(path_.bucket));
+ req.SetKey(ToAwsString(path_.key));
+ req.SetUploadId(upload_id_);
- auto outcome = client_lock.Move()->AbortMultipartUpload(req);
- if (!outcome.IsSuccess()) {
- return ErrorToStatus(
- std::forward_as_tuple("When aborting multiple part upload for key
'", path_.key,
- "' in bucket '", path_.bucket, "': "),
- "AbortMultipartUpload", outcome.GetError());
+ auto outcome = client_lock.Move()->AbortMultipartUpload(req);
+ if (!outcome.IsSuccess()) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When aborting multiple part upload for key
'",
+ path_.key, "' in bucket '", path_.bucket,
"': "),
+ "AbortMultipartUpload", outcome.GetError());
+ }
}
+
current_part_.reset();
holder_ = nullptr;
closed_ = true;
+
return Status::OK();
}
// OutputStream interface
+ bool ShouldBeMultipartUpload() { return pos_ >
kMultiPartUploadThresholdSize; }
+
+ bool IsMultipartUpload() { return ShouldBeMultipartUpload() ||
is_multipart_created_; }
+
Status EnsureReadyToFlushFromClose() {
- if (current_part_) {
- // Upload last part
- RETURN_NOT_OK(CommitCurrentPart());
- }
+ if (IsMultipartUpload()) {
Review Comment:
Can this actually happens when `part_size_` is large enough but not being
uploaded?
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1560,34 +1582,47 @@ class ObjectOutputStream final : public
io::OutputStream {
path_(path),
metadata_(metadata),
default_metadata_(options.default_metadata),
- background_writes_(options.background_writes) {}
+ background_writes_(options.background_writes),
+ sanitize_bucket_on_open_(options.sanitize_bucket_on_open) {}
~ObjectOutputStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
// even though it may be more expensive.
io::internal::CloseFromDestructor(this);
}
- Status Init() {
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+ template <typename ObjectRequest>
+ Status SetMetadataInRequest(ObjectRequest* request) {
+ std::shared_ptr<const KeyValueMetadata> metadata;
- // Initiate the multi-part upload
- S3Model::CreateMultipartUploadRequest req;
- req.SetBucket(ToAwsString(path_.bucket));
- req.SetKey(ToAwsString(path_.key));
if (metadata_ && metadata_->size() != 0) {
- RETURN_NOT_OK(SetObjectMetadata(metadata_, &req));
+ metadata = metadata_;
} else if (default_metadata_ && default_metadata_->size() != 0) {
- RETURN_NOT_OK(SetObjectMetadata(default_metadata_, &req));
+ metadata = default_metadata_;
}
- // If we do not set anything then the SDK will default to application/xml
- // which confuses some tools (https://github.com/apache/arrow/issues/11934)
- // So we instead default to application/octet-stream which is less
misleading
- if (!req.ContentTypeHasBeenSet()) {
- req.SetContentType("application/octet-stream");
+ if (metadata == nullptr ||
+
!metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) {
+ // If we do not set anything then the SDK will default to application/xml
+ // which confuses some tools
(https://github.com/apache/arrow/issues/11934)
+ // So we instead default to application/octet-stream which is less
misleading
+ request->SetContentType("application/octet-stream");
+ } else {
+ RETURN_NOT_OK(SetObjectMetadata(metadata, request));
}
+ return Status::OK();
+ }
+
+ Status CreateMultipartUpload() {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
Review Comment:
Can we assure that this is not being created?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]