bkietz commented on code in PR #37713:
URL: https://github.com/apache/arrow/pull/37713#discussion_r1330041901
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1454,9 +1454,51 @@ class ObjectOutputStream final : public io::OutputStream
{
// OutputStream interface
+ Status FinishPartUploadAfterFlush() {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+
+ // At this point, all part uploads have finished successfully
+ DCHECK_GT(part_number_, 1);
+ DCHECK_EQ(upload_state_->completed_parts.size(),
+ static_cast<size_t>(part_number_ - 1));
+
+ S3Model::CompletedMultipartUpload completed_upload;
+ completed_upload.SetParts(upload_state_->completed_parts);
+ S3Model::CompleteMultipartUploadRequest req;
+ req.SetBucket(ToAwsString(path_.bucket));
+ req.SetKey(ToAwsString(path_.key));
+ req.SetUploadId(upload_id_);
+ req.SetMultipartUpload(std::move(completed_upload));
+
+ auto outcome =
+
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
+ if (!outcome.IsSuccess()) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When completing multiple part upload for key
'",
+ path_.key, "' in bucket '", path_.bucket, "':
"),
+ "CompleteMultipartUpload", outcome.GetError());
+ }
+
+ holder_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
Status Close() override {
- auto fut = CloseAsync();
- return fut.status();
+ if (closed_) return Status::OK();
+
+ if (current_part_) {
+ // Upload last part
+ RETURN_NOT_OK(CommitCurrentPart());
+ }
+
+ // S3 mandates at least one part, upload an empty one if necessary
+ if (part_number_ == 1) {
+ RETURN_NOT_OK(UploadPart("", 0));
+ }
+
Review Comment:
Since this is repeated, please extract it to a `PreFlush` or
`EnsureReadyToFlushFromClose` member function
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -590,6 +590,17 @@ class TestS3FS : public S3TestMixin {
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}
+ void TestOpenOutputStreamCloseAsyncDestructor() {
+ std::shared_ptr<io::OutputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
+ ASSERT_OK(stream->Write("new data"));
+ // Destructor implicitly closes stream and completes the multipart upload.
+ auto closeAsyncFut = stream->CloseAsync();
Review Comment:
IIUC, the intent of this test is to verify that even if we call
`stream.reset()` like this we will still have non-segfaulting `closeAsyncFut`.
Please add a comment making it clear that for that purpose it doesn't matter
whether flush is triggered asynchronously after CloseAsync or sychronously
after stream.reset() (which could be confusing since this test doesn't ensure
ordering between the two)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]