pitrou commented on code in PR #41564:
URL: https://github.com/apache/arrow/pull/41564#discussion_r1624556269
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1293,12 +1295,14 @@ std::shared_ptr<const KeyValueMetadata>
GetObjectMetadata(const ObjectResult& re
template <typename ObjectRequest>
struct ObjectMetadataSetter {
+ static constexpr std::string_view kContentTypeKey = "Content-Type";
Review Comment:
"Content-Type" is still used as a literal above. Should we move this at the
top-level and use it everywhere?
Or, conversely, just undo this, since the "Content-Type" literal is unlikely
to change value...
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1606,42 +1656,62 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+ if (is_multipart_created_) {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
- S3Model::AbortMultipartUploadRequest req;
- req.SetBucket(ToAwsString(path_.bucket));
- req.SetKey(ToAwsString(path_.key));
- req.SetUploadId(upload_id_);
+ S3Model::AbortMultipartUploadRequest req;
+ req.SetBucket(ToAwsString(path_.bucket));
+ req.SetKey(ToAwsString(path_.key));
+ req.SetUploadId(upload_id_);
- auto outcome = client_lock.Move()->AbortMultipartUpload(req);
- if (!outcome.IsSuccess()) {
- return ErrorToStatus(
- std::forward_as_tuple("When aborting multiple part upload for key
'", path_.key,
- "' in bucket '", path_.bucket, "': "),
- "AbortMultipartUpload", outcome.GetError());
+ auto outcome = client_lock.Move()->AbortMultipartUpload(req);
+ if (!outcome.IsSuccess()) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When aborting multiple part upload for key
'",
+ path_.key, "' in bucket '", path_.bucket,
"': "),
+ "AbortMultipartUpload", outcome.GetError());
+ }
}
+
current_part_.reset();
holder_ = nullptr;
closed_ = true;
+
return Status::OK();
}
// OutputStream interface
+ bool ShouldBeMultipartUpload() const { return pos_ >
kMultiPartUploadThresholdSize; }
Review Comment:
Why not instead
```suggestion
bool ShouldBeMultipartUpload() const {
return pos_ > kMultiPartUploadThresholdSize || !allow_delayed_open_;
}
```
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1838,9 +1949,114 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
- int part_number, const
S3Model::UploadPartRequest& req,
- const Result<S3Model::UploadPartOutcome>&
result) {
+ static Status UploadError(const Aws::S3::Model::PutObjectRequest& request,
Review Comment:
Perhaps call this `UploadUsingSingleRequestError`?
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1606,42 +1656,62 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+ if (is_multipart_created_) {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
- S3Model::AbortMultipartUploadRequest req;
- req.SetBucket(ToAwsString(path_.bucket));
- req.SetKey(ToAwsString(path_.key));
- req.SetUploadId(upload_id_);
+ S3Model::AbortMultipartUploadRequest req;
+ req.SetBucket(ToAwsString(path_.bucket));
+ req.SetKey(ToAwsString(path_.key));
+ req.SetUploadId(upload_id_);
- auto outcome = client_lock.Move()->AbortMultipartUpload(req);
- if (!outcome.IsSuccess()) {
- return ErrorToStatus(
- std::forward_as_tuple("When aborting multiple part upload for key
'", path_.key,
- "' in bucket '", path_.bucket, "': "),
- "AbortMultipartUpload", outcome.GetError());
+ auto outcome = client_lock.Move()->AbortMultipartUpload(req);
+ if (!outcome.IsSuccess()) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When aborting multiple part upload for key
'",
+ path_.key, "' in bucket '", path_.bucket,
"': "),
+ "AbortMultipartUpload", outcome.GetError());
+ }
}
+
current_part_.reset();
holder_ = nullptr;
closed_ = true;
+
return Status::OK();
}
// OutputStream interface
+ bool ShouldBeMultipartUpload() const { return pos_ >
kMultiPartUploadThresholdSize; }
+
+ bool IsMultipartUpload() const {
+ return ShouldBeMultipartUpload() || is_multipart_created_;
Review Comment:
Why add `is_multipart_created_` here? Is there any situation where
`is_multipart_created_` would be true but `ShouldBeMultipartUpload()` would be
false?
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1560,34 +1582,49 @@ class ObjectOutputStream final : public
io::OutputStream {
path_(path),
metadata_(metadata),
default_metadata_(options.default_metadata),
- background_writes_(options.background_writes) {}
+ background_writes_(options.background_writes),
+ allow_delayed_open_(options.allow_delayed_open) {}
~ObjectOutputStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
// even though it may be more expensive.
io::internal::CloseFromDestructor(this);
}
- Status Init() {
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+ template <typename ObjectRequest>
+ Status SetMetadataInRequest(ObjectRequest* request) {
+ std::shared_ptr<const KeyValueMetadata> metadata;
- // Initiate the multi-part upload
- S3Model::CreateMultipartUploadRequest req;
- req.SetBucket(ToAwsString(path_.bucket));
- req.SetKey(ToAwsString(path_.key));
if (metadata_ && metadata_->size() != 0) {
- RETURN_NOT_OK(SetObjectMetadata(metadata_, &req));
+ metadata = metadata_;
} else if (default_metadata_ && default_metadata_->size() != 0) {
- RETURN_NOT_OK(SetObjectMetadata(default_metadata_, &req));
+ metadata = default_metadata_;
}
- // If we do not set anything then the SDK will default to application/xml
- // which confuses some tools (https://github.com/apache/arrow/issues/11934)
- // So we instead default to application/octet-stream which is less
misleading
- if (!req.ContentTypeHasBeenSet()) {
- req.SetContentType("application/octet-stream");
+ if (metadata &&
+
metadata->Contains(ObjectMetadataSetter<ObjectRequest>::kContentTypeKey)) {
+ RETURN_NOT_OK(SetObjectMetadata(metadata, request));
+ } else {
+ // If we do not set anything then the SDK will default to application/xml
+ // which confuses some tools
(https://github.com/apache/arrow/issues/11934)
+ // So we instead default to application/octet-stream which is less
misleading
+ request->SetContentType("application/octet-stream");
Review Comment:
So `metadata` is ignored if it doesn't contain a "Content-Type" key? Or am I
missing something here?
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -1189,40 +1191,55 @@ TEST_F(TestS3FS, OpenInputFile) {
ASSERT_RAISES(IOError, file->Seek(10));
}
-TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); }
+struct S3OptionsTestParameters {
+ bool background_writes{false};
+ bool allow_delayed_open{false};
-TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStream();
-}
-
-TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) {
TestOpenOutputStreamAbort(); }
+ void apply_to_s3_options(S3Options& options) const {
+ options.background_writes = background_writes;
+ options.allow_delayed_open = allow_delayed_open;
+ }
Review Comment:
Coding style nit: 1) use `CamelCase` for function names, 2) avoid mutable
refs; therefore:
```suggestion
void ApplyToS3Options(S3Options* options) const {
options->background_writes = background_writes;
options->allow_delayed_open = allow_delayed_open;
}
```
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -1189,40 +1191,55 @@ TEST_F(TestS3FS, OpenInputFile) {
ASSERT_RAISES(IOError, file->Seek(10));
}
-TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); }
+struct S3OptionsTestParameters {
+ bool background_writes{false};
+ bool allow_delayed_open{false};
-TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStream();
-}
-
-TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) {
TestOpenOutputStreamAbort(); }
+ void apply_to_s3_options(S3Options& options) const {
+ options.background_writes = background_writes;
+ options.allow_delayed_open = allow_delayed_open;
+ }
-TEST_F(TestS3FS, OpenOutputStreamAbortSyncWrites) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStreamAbort();
-}
+ static std::vector<S3OptionsTestParameters> GetCartesianProduct() {
+ return {
+ S3OptionsTestParameters{true, false},
+ S3OptionsTestParameters{false, false},
+ S3OptionsTestParameters{true, true},
+ S3OptionsTestParameters{false, true},
+ };
+ }
+};
-TEST_F(TestS3FS, OpenOutputStreamDestructorBackgroundWrites) {
- TestOpenOutputStreamDestructor();
+TEST_F(TestS3FS, OpenOutputStream) {
+ for (const auto& combination :
S3OptionsTestParameters::GetCartesianProduct()) {
+ combination.apply_to_s3_options(options_);
+ MakeFileSystem();
Review Comment:
I don't think this is deleting the files currently on the filesystem, which
means the tests might succeed even if the write path doesn't work for some
options.
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1853,8 +2069,8 @@ class ObjectOutputStream final : public io::OutputStream {
}
}
// Notify completion
- if (--state->parts_in_progress == 0) {
- state->pending_parts_completed.MarkFinished(state->status);
+ if (--state->uploads_in_progress == 0) {
+ state->pending_uploads_completed.MarkFinished(state->status);
Review Comment:
Same here.
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -1189,40 +1191,55 @@ TEST_F(TestS3FS, OpenInputFile) {
ASSERT_RAISES(IOError, file->Seek(10));
}
-TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); }
+struct S3OptionsTestParameters {
+ bool background_writes{false};
+ bool allow_delayed_open{false};
-TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStream();
-}
-
-TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) {
TestOpenOutputStreamAbort(); }
+ void apply_to_s3_options(S3Options& options) const {
+ options.background_writes = background_writes;
+ options.allow_delayed_open = allow_delayed_open;
+ }
-TEST_F(TestS3FS, OpenOutputStreamAbortSyncWrites) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStreamAbort();
-}
+ static std::vector<S3OptionsTestParameters> GetCartesianProduct() {
+ return {
+ S3OptionsTestParameters{true, false},
+ S3OptionsTestParameters{false, false},
+ S3OptionsTestParameters{true, true},
+ S3OptionsTestParameters{false, true},
+ };
+ }
+};
-TEST_F(TestS3FS, OpenOutputStreamDestructorBackgroundWrites) {
- TestOpenOutputStreamDestructor();
+TEST_F(TestS3FS, OpenOutputStream) {
+ for (const auto& combination :
S3OptionsTestParameters::GetCartesianProduct()) {
+ combination.apply_to_s3_options(options_);
Review Comment:
Ideally we would also leave a trace in the test log to make diagnosing
failures easier (see the `ARROW_SCOPED_TRACE` macro somewhere).
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1873,24 +2089,18 @@ class ObjectOutputStream final : public
io::OutputStream {
state->completed_parts[slot] = std::move(part);
}
- static Status UploadPartError(const S3Model::UploadPartRequest& req,
- const S3Model::UploadPartOutcome& outcome) {
- return ErrorToStatus(
- std::forward_as_tuple("When uploading part for key '", req.GetKey(),
- "' in bucket '", req.GetBucket(), "': "),
- "UploadPart", outcome.GetError());
- }
-
protected:
std::shared_ptr<S3ClientHolder> holder_;
const io::IOContext io_context_;
const S3Path path_;
const std::shared_ptr<const KeyValueMetadata> metadata_;
const std::shared_ptr<const KeyValueMetadata> default_metadata_;
const bool background_writes_;
+ const bool allow_delayed_open_;
Aws::String upload_id_;
bool closed_ = true;
+ bool is_multipart_created_ = false;
Review Comment:
For the record, `is_multipart_created_ == true` iff `!upload_id_.empty()`,
right? Perhaps this can be consolidated and we can rename `upload_id_` to
something more explicit, such as `multipart_upload_id`.
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1838,9 +1949,114 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
- int part_number, const
S3Model::UploadPartRequest& req,
- const Result<S3Model::UploadPartOutcome>&
result) {
+ static Status UploadError(const Aws::S3::Model::PutObjectRequest& request,
+ const Aws::S3::Model::PutObjectOutcome& outcome) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When uploading object with key '",
request.GetKey(),
+ "' in bucket '", request.GetBucket(), "': "),
+ "PutObject", outcome.GetError());
+ }
+
+ Status UploadUsingSingleRequest(std::shared_ptr<Buffer> buffer) {
+ return UploadUsingSingleRequest(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status UploadUsingSingleRequest(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer =
nullptr) {
+ auto sync_result_callback = [](const Aws::S3::Model::PutObjectRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::PutObjectOutcome outcome) {
+ if (!outcome.IsSuccess()) {
+ return UploadError(request, outcome);
+ }
+ return Status::OK();
+ };
+
+ auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::PutObjectOutcome outcome) {
+ HandleUploadUsingSingleRequestOutcome(state, request,
outcome.GetResult());
+ return Status::OK();
+ };
+
+ Aws::S3::Model::PutObjectRequest req{};
+ RETURN_NOT_OK(SetMetadataInRequest(&req));
+
+ return Upload<Aws::S3::Model::PutObjectRequest,
Aws::S3::Model::PutObjectOutcome>(
+ std::move(req), std::move(sync_result_callback),
std::move(async_result_callback),
+ data, nbytes, std::move(owned_buffer));
+ }
+
+ Status UploadPart(std::shared_ptr<Buffer> buffer) {
+ return UploadPart(buffer->data(), buffer->size(), buffer);
+ }
+
+ static Status UploadPartError(const Aws::S3::Model::UploadPartRequest&
request,
+ const Aws::S3::Model::UploadPartOutcome&
outcome) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When uploading part for key '",
request.GetKey(),
+ "' in bucket '", request.GetBucket(), "': "),
+ "UploadPart", outcome.GetError());
+ }
+
+ Status UploadPart(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ if (!is_multipart_created_) {
+ RETURN_NOT_OK(CreateMultipartUpload());
+ }
+
+ Aws::S3::Model::UploadPartRequest req{};
+ req.SetPartNumber(part_number_);
+ req.SetUploadId(upload_id_);
+
+ auto sync_result_callback = [](const Aws::S3::Model::UploadPartRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::UploadPartOutcome outcome) {
+ if (!outcome.IsSuccess()) {
+ return UploadPartError(request, outcome);
+ } else {
+ AddCompletedPart(state, part_number, outcome.GetResult());
+ }
+
+ return Status::OK();
+ };
+
+ auto async_result_callback = [](const Aws::S3::Model::UploadPartRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::UploadPartOutcome outcome)
{
+ HandleUploadPartOutcome(state, part_number, request,
outcome.GetResult());
+ return Status::OK();
+ };
+
+ return Upload<Aws::S3::Model::UploadPartRequest,
Aws::S3::Model::UploadPartOutcome>(
+ std::move(req), std::move(sync_result_callback),
std::move(async_result_callback),
+ data, nbytes, std::move(owned_buffer));
+ }
+
+ static void HandleUploadUsingSingleRequestOutcome(
+ const std::shared_ptr<UploadState>& state, const
S3Model::PutObjectRequest& req,
+ const Result<S3Model::PutObjectOutcome>& result) {
+ std::unique_lock<std::mutex> lock(state->mutex);
+ if (!result.ok()) {
+ state->status &= result.status();
+ } else {
+ const auto& outcome = *result;
+ if (!outcome.IsSuccess()) {
+ state->status &= UploadError(req, outcome);
+ }
+ }
+
+ state->pending_uploads_completed.MarkFinished(state->status);
Review Comment:
Based on https://github.com/apache/arrow/pull/41876/files, we probably want
to unlock the mutex before marking the future finished.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]