pitrou commented on code in PR #41564:
URL: https://github.com/apache/arrow/pull/41564#discussion_r1679534888
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1583,6 +1595,14 @@ class ObjectInputFile final : public
io::RandomAccessFile {
// (for rational, see: https://github.com/apache/arrow/issues/34363)
static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024;
+// Above this threshold, use a multi-part upload instead of a single request
upload. Only
+// relevant if early sanitization of writing to the bucket is activated.
Review Comment:
Isn't it the reverse? For example:
```suggestion
// Above this threshold, use a multi-part upload instead of a single request
upload.
// Only relevant if early sanitization of writing to the bucket is disabled
// (see `allow_delayed_open`).
```
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1880,9 +1991,117 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
- int part_number, const
S3Model::UploadPartRequest& req,
- const Result<S3Model::UploadPartOutcome>&
result) {
+ static Status UploadUsingSingleRequestError(
+ const Aws::S3::Model::PutObjectRequest& request,
+ const Aws::S3::Model::PutObjectOutcome& outcome) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When uploading object with key '",
request.GetKey(),
+ "' in bucket '", request.GetBucket(), "': "),
+ "PutObject", outcome.GetError());
+ }
+
+ Status UploadUsingSingleRequest(std::shared_ptr<Buffer> buffer) {
+ return UploadUsingSingleRequest(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status UploadUsingSingleRequest(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer =
nullptr) {
+ auto sync_result_callback = [](const Aws::S3::Model::PutObjectRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::PutObjectOutcome outcome) {
+ if (!outcome.IsSuccess()) {
+ return UploadUsingSingleRequestError(request, outcome);
+ }
+ return Status::OK();
+ };
+
+ auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::PutObjectOutcome outcome) {
+ HandleUploadUsingSingleRequestOutcome(state, request,
outcome.GetResult());
+ return Status::OK();
+ };
+
+ Aws::S3::Model::PutObjectRequest req{};
+ RETURN_NOT_OK(SetMetadataInRequest(&req));
+
+ return Upload<Aws::S3::Model::PutObjectRequest,
Aws::S3::Model::PutObjectOutcome>(
+ std::move(req), std::move(sync_result_callback),
std::move(async_result_callback),
+ data, nbytes, std::move(owned_buffer));
+ }
+
+ Status UploadPart(std::shared_ptr<Buffer> buffer) {
+ return UploadPart(buffer->data(), buffer->size(), buffer);
+ }
+
+ static Status UploadPartError(const Aws::S3::Model::UploadPartRequest&
request,
+ const Aws::S3::Model::UploadPartOutcome&
outcome) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When uploading part for key '",
request.GetKey(),
+ "' in bucket '", request.GetBucket(), "': "),
+ "UploadPart", outcome.GetError());
+ }
+
+ Status UploadPart(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ if (!IsMultipartCreated()) {
+ RETURN_NOT_OK(CreateMultipartUpload());
+ }
+
+ Aws::S3::Model::UploadPartRequest req{};
+ req.SetPartNumber(part_number_);
+ req.SetUploadId(multipart_upload_id_);
+
+ auto sync_result_callback = [](const Aws::S3::Model::UploadPartRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::UploadPartOutcome outcome) {
+ if (!outcome.IsSuccess()) {
+ return UploadPartError(request, outcome);
+ } else {
+ AddCompletedPart(state, part_number, outcome.GetResult());
+ }
+
+ return Status::OK();
+ };
+
+ auto async_result_callback = [](const Aws::S3::Model::UploadPartRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::UploadPartOutcome outcome)
{
+ HandleUploadPartOutcome(state, part_number, request,
outcome.GetResult());
+ return Status::OK();
+ };
+
+ return Upload<Aws::S3::Model::UploadPartRequest,
Aws::S3::Model::UploadPartOutcome>(
+ std::move(req), std::move(sync_result_callback),
std::move(async_result_callback),
+ data, nbytes, std::move(owned_buffer));
+ }
+
+ static void HandleUploadUsingSingleRequestOutcome(
+ const std::shared_ptr<UploadState>& state, const
S3Model::PutObjectRequest& req,
+ const Result<S3Model::PutObjectOutcome>& result) {
+ std::unique_lock<std::mutex> lock(state->mutex);
+ if (!result.ok()) {
+ state->status &= result.status();
+ } else {
+ const auto& outcome = *result;
+ if (!outcome.IsSuccess()) {
+ state->status &= UploadUsingSingleRequestError(req, outcome);
+ }
+ }
+ // GH-41862: avoid potential deadlock if the Future's callback is called
+ // with the mutex taken.
+ lock.unlock();
+ state->pending_uploads_completed.MarkFinished(state->status);
Review Comment:
I suggest to use the same idiom as below, to avoid a race condition
accessing the future:
```suggestion
auto fut = state->pending_uploads_completed;
lock.unlock();
fut.MarkFinished(state->status);
```
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -478,6 +463,74 @@ class TestS3FS : public S3TestMixin {
S3TestMixin::TearDown();
}
+ Status PopulateTestBucket() {
+ Aws::S3::Model::PutObjectRequest req;
+ req.SetBucket(ToAwsString("bucket"));
+ req.SetKey(ToAwsString("emptydir/"));
+ req.SetBody(std::make_shared<std::stringstream>(""));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ // NOTE: no need to create intermediate "directories" somedir/ and
+ // somedir/subdir/
+ req.SetKey(ToAwsString("somedir/subdir/subfile"));
+ req.SetBody(std::make_shared<std::stringstream>("sub data"));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ req.SetKey(ToAwsString("somefile"));
+ req.SetBody(std::make_shared<std::stringstream>("some data"));
+ req.SetContentType("x-arrow/test");
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ req.SetKey(ToAwsString("otherdir/1/2/3/otherfile"));
+ req.SetBody(std::make_shared<std::stringstream>("other data"));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+
+ return Status::OK();
+ }
+
+ Status RestoreTestBucket() {
+ // First empty the test bucket, and then re-upload initial test files.
+
+ Aws::Vector<Aws::S3::Model::Object> all_objects;
+ {
+ // Mostly taken from
+ //
https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/cpp/example_code/s3/list_objects.cpp
+ Aws::S3::Model::ListObjectsV2Request req;
+ req.WithBucket(Aws::String{"bucket"});
+
+ Aws::String continuation_token;
+ do {
+ if (!continuation_token.empty()) {
+ req.SetContinuationToken(continuation_token);
+ }
+
+ auto outcome = client_->ListObjectsV2(req);
+
+ if (!outcome.IsSuccess()) {
+ return OutcomeToStatus("ListObjectsV2", outcome);
+ } else {
+ Aws::Vector<Aws::S3::Model::Object> objects =
outcome.GetResult().GetContents();
+ all_objects.insert(all_objects.end(), objects.begin(),
objects.end());
+ continuation_token = outcome.GetResult().GetNextContinuationToken();
+ }
+ } while (!continuation_token.empty());
+ }
+
+ {
+ Aws::S3::Model::DeleteObjectsRequest req;
+
+ Aws::S3::Model::Delete delete_object;
+ for (const auto& object : all_objects) {
+ delete_object.AddObjects(
+ Aws::S3::Model::ObjectIdentifier().WithKey(object.GetKey()));
+ }
+
+ req.SetDelete(delete_object);
Review Comment:
Probably unimportant, but we can avoid a copy of the objects vector here.
```suggestion
req.SetDelete(std::move(delete_object));
```
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -23,6 +23,8 @@
#include <utility>
#include <vector>
+#include <aws/s3/model/DeleteObjectsRequest.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
Review Comment:
Can you move those below with other AWS includes?
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -478,6 +463,74 @@ class TestS3FS : public S3TestMixin {
S3TestMixin::TearDown();
}
+ Status PopulateTestBucket() {
+ Aws::S3::Model::PutObjectRequest req;
+ req.SetBucket(ToAwsString("bucket"));
+ req.SetKey(ToAwsString("emptydir/"));
+ req.SetBody(std::make_shared<std::stringstream>(""));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ // NOTE: no need to create intermediate "directories" somedir/ and
+ // somedir/subdir/
+ req.SetKey(ToAwsString("somedir/subdir/subfile"));
+ req.SetBody(std::make_shared<std::stringstream>("sub data"));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ req.SetKey(ToAwsString("somefile"));
+ req.SetBody(std::make_shared<std::stringstream>("some data"));
+ req.SetContentType("x-arrow/test");
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ req.SetKey(ToAwsString("otherdir/1/2/3/otherfile"));
+ req.SetBody(std::make_shared<std::stringstream>("other data"));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+
+ return Status::OK();
+ }
+
+ Status RestoreTestBucket() {
+ // First empty the test bucket, and then re-upload initial test files.
+
+ Aws::Vector<Aws::S3::Model::Object> all_objects;
+ {
+ // Mostly taken from
+ //
https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/cpp/example_code/s3/list_objects.cpp
+ Aws::S3::Model::ListObjectsV2Request req;
+ req.WithBucket(Aws::String{"bucket"});
+
+ Aws::String continuation_token;
+ do {
+ if (!continuation_token.empty()) {
+ req.SetContinuationToken(continuation_token);
+ }
+
+ auto outcome = client_->ListObjectsV2(req);
+
+ if (!outcome.IsSuccess()) {
+ return OutcomeToStatus("ListObjectsV2", outcome);
+ } else {
+ Aws::Vector<Aws::S3::Model::Object> objects =
outcome.GetResult().GetContents();
+ all_objects.insert(all_objects.end(), objects.begin(),
objects.end());
Review Comment:
Note that you could incrementally populate the `delete_object` here instead
of going through an intermediate vector `all_objects`.
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -478,6 +463,74 @@ class TestS3FS : public S3TestMixin {
S3TestMixin::TearDown();
}
+ Status PopulateTestBucket() {
+ Aws::S3::Model::PutObjectRequest req;
+ req.SetBucket(ToAwsString("bucket"));
+ req.SetKey(ToAwsString("emptydir/"));
+ req.SetBody(std::make_shared<std::stringstream>(""));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ // NOTE: no need to create intermediate "directories" somedir/ and
+ // somedir/subdir/
+ req.SetKey(ToAwsString("somedir/subdir/subfile"));
+ req.SetBody(std::make_shared<std::stringstream>("sub data"));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ req.SetKey(ToAwsString("somefile"));
+ req.SetBody(std::make_shared<std::stringstream>("some data"));
+ req.SetContentType("x-arrow/test");
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ req.SetKey(ToAwsString("otherdir/1/2/3/otherfile"));
+ req.SetBody(std::make_shared<std::stringstream>("other data"));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+
+ return Status::OK();
+ }
+
+ Status RestoreTestBucket() {
+ // First empty the test bucket, and then re-upload initial test files.
+
+ Aws::Vector<Aws::S3::Model::Object> all_objects;
+ {
+ // Mostly taken from
+ //
https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/cpp/example_code/s3/list_objects.cpp
+ Aws::S3::Model::ListObjectsV2Request req;
+ req.WithBucket(Aws::String{"bucket"});
Review Comment:
Nit: no need to use the reference-returning methods here
```suggestion
req.SetBucket(Aws::String{"bucket"});
```
##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -584,6 +639,8 @@ class TestS3FS : public S3TestMixin {
ASSERT_OK(stream->Close());
ASSERT_TRUE(weak_fs.expired());
AssertObjectContents(client_.get(), "bucket", "newfile99", "some other
data");
+
+ ASSERT_OK(RestoreTestBucket());
Review Comment:
Can we move the `RestoreTestBucket` calls into the actual test loops?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]