pitrou commented on code in PR #41564:
URL: https://github.com/apache/arrow/pull/41564#discussion_r1624556269


##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1293,12 +1295,14 @@ std::shared_ptr<const KeyValueMetadata> 
GetObjectMetadata(const ObjectResult& re
 
 template <typename ObjectRequest>
 struct ObjectMetadataSetter {
+  static constexpr std::string_view kContentTypeKey = "Content-Type";

Review Comment:
   "Content-Type" is still used as a literal above. Should we move this at the 
top-level and use it everywhere?
   Or, conversely, just undo this, since the "Content-Type" literal is unlikely 
to change value...



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1606,42 +1656,62 @@ class ObjectOutputStream final : public 
io::OutputStream {
       return Status::OK();
     }
 
-    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+    if (is_multipart_created_) {
+      ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
 
-    S3Model::AbortMultipartUploadRequest req;
-    req.SetBucket(ToAwsString(path_.bucket));
-    req.SetKey(ToAwsString(path_.key));
-    req.SetUploadId(upload_id_);
+      S3Model::AbortMultipartUploadRequest req;
+      req.SetBucket(ToAwsString(path_.bucket));
+      req.SetKey(ToAwsString(path_.key));
+      req.SetUploadId(upload_id_);
 
-    auto outcome = client_lock.Move()->AbortMultipartUpload(req);
-    if (!outcome.IsSuccess()) {
-      return ErrorToStatus(
-          std::forward_as_tuple("When aborting multiple part upload for key 
'", path_.key,
-                                "' in bucket '", path_.bucket, "': "),
-          "AbortMultipartUpload", outcome.GetError());
+      auto outcome = client_lock.Move()->AbortMultipartUpload(req);
+      if (!outcome.IsSuccess()) {
+        return ErrorToStatus(
+            std::forward_as_tuple("When aborting multiple part upload for key 
'",
+                                  path_.key, "' in bucket '", path_.bucket, 
"': "),
+            "AbortMultipartUpload", outcome.GetError());
+      }
     }
+
     current_part_.reset();
     holder_ = nullptr;
     closed_ = true;
+
     return Status::OK();
   }
 
   // OutputStream interface
 
+  bool ShouldBeMultipartUpload() const { return pos_ > 
kMultiPartUploadThresholdSize; }

Review Comment:
   Why not instead
   ```suggestion
     bool ShouldBeMultipartUpload() const {
       return pos_ > kMultiPartUploadThresholdSize || !allow_delayed_open_;
     }
   ```



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1838,9 +1949,114 @@ class ObjectOutputStream final : public 
io::OutputStream {
     return Status::OK();
   }
 
-  static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
-                                  int part_number, const 
S3Model::UploadPartRequest& req,
-                                  const Result<S3Model::UploadPartOutcome>& 
result) {
+  static Status UploadError(const Aws::S3::Model::PutObjectRequest& request,

Review Comment:
   Perhaps call this `UploadUsingSingleRequestError`?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1606,42 +1656,62 @@ class ObjectOutputStream final : public 
io::OutputStream {
       return Status::OK();
     }
 
-    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+    if (is_multipart_created_) {
+      ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
 
-    S3Model::AbortMultipartUploadRequest req;
-    req.SetBucket(ToAwsString(path_.bucket));
-    req.SetKey(ToAwsString(path_.key));
-    req.SetUploadId(upload_id_);
+      S3Model::AbortMultipartUploadRequest req;
+      req.SetBucket(ToAwsString(path_.bucket));
+      req.SetKey(ToAwsString(path_.key));
+      req.SetUploadId(upload_id_);
 
-    auto outcome = client_lock.Move()->AbortMultipartUpload(req);
-    if (!outcome.IsSuccess()) {
-      return ErrorToStatus(
-          std::forward_as_tuple("When aborting multiple part upload for key 
'", path_.key,
-                                "' in bucket '", path_.bucket, "': "),
-          "AbortMultipartUpload", outcome.GetError());
+      auto outcome = client_lock.Move()->AbortMultipartUpload(req);
+      if (!outcome.IsSuccess()) {
+        return ErrorToStatus(
+            std::forward_as_tuple("When aborting multiple part upload for key 
'",
+                                  path_.key, "' in bucket '", path_.bucket, 
"': "),
+            "AbortMultipartUpload", outcome.GetError());
+      }
     }
+
     current_part_.reset();
     holder_ = nullptr;
     closed_ = true;
+
     return Status::OK();
   }
 
   // OutputStream interface
 
+  bool ShouldBeMultipartUpload() const { return pos_ > 
kMultiPartUploadThresholdSize; }
+
+  bool IsMultipartUpload() const {
+    return ShouldBeMultipartUpload() || is_multipart_created_;

Review Comment:
   Why add `is_multipart_created_` here? Is there any situation where 
`is_multipart_created_` would be true but `ShouldBeMultipartUpload()` would be 
false?



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1560,34 +1582,49 @@ class ObjectOutputStream final : public 
io::OutputStream {
         path_(path),
         metadata_(metadata),
         default_metadata_(options.default_metadata),
-        background_writes_(options.background_writes) {}
+        background_writes_(options.background_writes),
+        allow_delayed_open_(options.allow_delayed_open) {}
 
   ~ObjectOutputStream() override {
     // For compliance with the rest of the IO stack, Close rather than Abort,
     // even though it may be more expensive.
     io::internal::CloseFromDestructor(this);
   }
 
-  Status Init() {
-    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+  template <typename ObjectRequest>
+  Status SetMetadataInRequest(ObjectRequest* request) {
+    std::shared_ptr<const KeyValueMetadata> metadata;
 
-    // Initiate the multi-part upload
-    S3Model::CreateMultipartUploadRequest req;
-    req.SetBucket(ToAwsString(path_.bucket));
-    req.SetKey(ToAwsString(path_.key));
     if (metadata_ && metadata_->size() != 0) {
-      RETURN_NOT_OK(SetObjectMetadata(metadata_, &req));
+      metadata = metadata_;
     } else if (default_metadata_ && default_metadata_->size() != 0) {
-      RETURN_NOT_OK(SetObjectMetadata(default_metadata_, &req));
+      metadata = default_metadata_;
     }
 
-    // If we do not set anything then the SDK will default to application/xml
-    // which confuses some tools (https://github.com/apache/arrow/issues/11934)
-    // So we instead default to application/octet-stream which is less 
misleading
-    if (!req.ContentTypeHasBeenSet()) {
-      req.SetContentType("application/octet-stream");
+    if (metadata &&
+        
metadata->Contains(ObjectMetadataSetter<ObjectRequest>::kContentTypeKey)) {
+      RETURN_NOT_OK(SetObjectMetadata(metadata, request));
+    } else {
+      // If we do not set anything then the SDK will default to application/xml
+      // which confuses some tools 
(https://github.com/apache/arrow/issues/11934)
+      // So we instead default to application/octet-stream which is less 
misleading
+      request->SetContentType("application/octet-stream");

Review Comment:
   So `metadata` is ignored if it doesn't contain a "Content-Type" key? Or am I 
missing something here?



##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -1189,40 +1191,55 @@ TEST_F(TestS3FS, OpenInputFile) {
   ASSERT_RAISES(IOError, file->Seek(10));
 }
 
-TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); }
+struct S3OptionsTestParameters {
+  bool background_writes{false};
+  bool allow_delayed_open{false};
 
-TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
-  options_.background_writes = false;
-  MakeFileSystem();
-  TestOpenOutputStream();
-}
-
-TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) { 
TestOpenOutputStreamAbort(); }
+  void apply_to_s3_options(S3Options& options) const {
+    options.background_writes = background_writes;
+    options.allow_delayed_open = allow_delayed_open;
+  }

Review Comment:
   Coding style nit: 1) use `CamelCase` for function names, 2) avoid mutable 
refs; therefore:
   ```suggestion
     void ApplyToS3Options(S3Options* options) const {
       options->background_writes = background_writes;
       options->allow_delayed_open = allow_delayed_open;
     }
   ```



##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -1189,40 +1191,55 @@ TEST_F(TestS3FS, OpenInputFile) {
   ASSERT_RAISES(IOError, file->Seek(10));
 }
 
-TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); }
+struct S3OptionsTestParameters {
+  bool background_writes{false};
+  bool allow_delayed_open{false};
 
-TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
-  options_.background_writes = false;
-  MakeFileSystem();
-  TestOpenOutputStream();
-}
-
-TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) { 
TestOpenOutputStreamAbort(); }
+  void apply_to_s3_options(S3Options& options) const {
+    options.background_writes = background_writes;
+    options.allow_delayed_open = allow_delayed_open;
+  }
 
-TEST_F(TestS3FS, OpenOutputStreamAbortSyncWrites) {
-  options_.background_writes = false;
-  MakeFileSystem();
-  TestOpenOutputStreamAbort();
-}
+  static std::vector<S3OptionsTestParameters> GetCartesianProduct() {
+    return {
+        S3OptionsTestParameters{true, false},
+        S3OptionsTestParameters{false, false},
+        S3OptionsTestParameters{true, true},
+        S3OptionsTestParameters{false, true},
+    };
+  }
+};
 
-TEST_F(TestS3FS, OpenOutputStreamDestructorBackgroundWrites) {
-  TestOpenOutputStreamDestructor();
+TEST_F(TestS3FS, OpenOutputStream) {
+  for (const auto& combination : 
S3OptionsTestParameters::GetCartesianProduct()) {
+    combination.apply_to_s3_options(options_);
+    MakeFileSystem();

Review Comment:
   I don't think this is deleting the files currently on the filesystem, which 
means the tests might succeed even if the write path doesn't work for some 
options.



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1853,8 +2069,8 @@ class ObjectOutputStream final : public io::OutputStream {
       }
     }
     // Notify completion
-    if (--state->parts_in_progress == 0) {
-      state->pending_parts_completed.MarkFinished(state->status);
+    if (--state->uploads_in_progress == 0) {
+      state->pending_uploads_completed.MarkFinished(state->status);

Review Comment:
   Same here.



##########
cpp/src/arrow/filesystem/s3fs_test.cc:
##########
@@ -1189,40 +1191,55 @@ TEST_F(TestS3FS, OpenInputFile) {
   ASSERT_RAISES(IOError, file->Seek(10));
 }
 
-TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); }
+struct S3OptionsTestParameters {
+  bool background_writes{false};
+  bool allow_delayed_open{false};
 
-TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
-  options_.background_writes = false;
-  MakeFileSystem();
-  TestOpenOutputStream();
-}
-
-TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) { 
TestOpenOutputStreamAbort(); }
+  void apply_to_s3_options(S3Options& options) const {
+    options.background_writes = background_writes;
+    options.allow_delayed_open = allow_delayed_open;
+  }
 
-TEST_F(TestS3FS, OpenOutputStreamAbortSyncWrites) {
-  options_.background_writes = false;
-  MakeFileSystem();
-  TestOpenOutputStreamAbort();
-}
+  static std::vector<S3OptionsTestParameters> GetCartesianProduct() {
+    return {
+        S3OptionsTestParameters{true, false},
+        S3OptionsTestParameters{false, false},
+        S3OptionsTestParameters{true, true},
+        S3OptionsTestParameters{false, true},
+    };
+  }
+};
 
-TEST_F(TestS3FS, OpenOutputStreamDestructorBackgroundWrites) {
-  TestOpenOutputStreamDestructor();
+TEST_F(TestS3FS, OpenOutputStream) {
+  for (const auto& combination : 
S3OptionsTestParameters::GetCartesianProduct()) {
+    combination.apply_to_s3_options(options_);

Review Comment:
   Ideally we would also leave a trace in the test log to make diagnosing 
failures easier (see the `ARROW_SCOPED_TRACE` macro somewhere).



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1873,24 +2089,18 @@ class ObjectOutputStream final : public 
io::OutputStream {
     state->completed_parts[slot] = std::move(part);
   }
 
-  static Status UploadPartError(const S3Model::UploadPartRequest& req,
-                                const S3Model::UploadPartOutcome& outcome) {
-    return ErrorToStatus(
-        std::forward_as_tuple("When uploading part for key '", req.GetKey(),
-                              "' in bucket '", req.GetBucket(), "': "),
-        "UploadPart", outcome.GetError());
-  }
-
  protected:
   std::shared_ptr<S3ClientHolder> holder_;
   const io::IOContext io_context_;
   const S3Path path_;
   const std::shared_ptr<const KeyValueMetadata> metadata_;
   const std::shared_ptr<const KeyValueMetadata> default_metadata_;
   const bool background_writes_;
+  const bool allow_delayed_open_;
 
   Aws::String upload_id_;
   bool closed_ = true;
+  bool is_multipart_created_ = false;

Review Comment:
   For the record, `is_multipart_created_ == true` iff `!upload_id_.empty()`, 
right? Perhaps this can be consolidated and we can rename `upload_id_` to 
something more explicit, such as `multipart_upload_id`.



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1838,9 +1949,114 @@ class ObjectOutputStream final : public 
io::OutputStream {
     return Status::OK();
   }
 
-  static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
-                                  int part_number, const 
S3Model::UploadPartRequest& req,
-                                  const Result<S3Model::UploadPartOutcome>& 
result) {
+  static Status UploadError(const Aws::S3::Model::PutObjectRequest& request,
+                            const Aws::S3::Model::PutObjectOutcome& outcome) {
+    return ErrorToStatus(
+        std::forward_as_tuple("When uploading object with key '", 
request.GetKey(),
+                              "' in bucket '", request.GetBucket(), "': "),
+        "PutObject", outcome.GetError());
+  }
+
+  Status UploadUsingSingleRequest(std::shared_ptr<Buffer> buffer) {
+    return UploadUsingSingleRequest(buffer->data(), buffer->size(), buffer);
+  }
+
+  Status UploadUsingSingleRequest(const void* data, int64_t nbytes,
+                                  std::shared_ptr<Buffer> owned_buffer = 
nullptr) {
+    auto sync_result_callback = [](const Aws::S3::Model::PutObjectRequest& 
request,
+                                   std::shared_ptr<UploadState> state,
+                                   int32_t part_number,
+                                   Aws::S3::Model::PutObjectOutcome outcome) {
+      if (!outcome.IsSuccess()) {
+        return UploadError(request, outcome);
+      }
+      return Status::OK();
+    };
+
+    auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest& 
request,
+                                    std::shared_ptr<UploadState> state,
+                                    int32_t part_number,
+                                    Aws::S3::Model::PutObjectOutcome outcome) {
+      HandleUploadUsingSingleRequestOutcome(state, request, 
outcome.GetResult());
+      return Status::OK();
+    };
+
+    Aws::S3::Model::PutObjectRequest req{};
+    RETURN_NOT_OK(SetMetadataInRequest(&req));
+
+    return Upload<Aws::S3::Model::PutObjectRequest, 
Aws::S3::Model::PutObjectOutcome>(
+        std::move(req), std::move(sync_result_callback), 
std::move(async_result_callback),
+        data, nbytes, std::move(owned_buffer));
+  }
+
+  Status UploadPart(std::shared_ptr<Buffer> buffer) {
+    return UploadPart(buffer->data(), buffer->size(), buffer);
+  }
+
+  static Status UploadPartError(const Aws::S3::Model::UploadPartRequest& 
request,
+                                const Aws::S3::Model::UploadPartOutcome& 
outcome) {
+    return ErrorToStatus(
+        std::forward_as_tuple("When uploading part for key '", 
request.GetKey(),
+                              "' in bucket '", request.GetBucket(), "': "),
+        "UploadPart", outcome.GetError());
+  }
+
+  Status UploadPart(const void* data, int64_t nbytes,
+                    std::shared_ptr<Buffer> owned_buffer = nullptr) {
+    if (!is_multipart_created_) {
+      RETURN_NOT_OK(CreateMultipartUpload());
+    }
+
+    Aws::S3::Model::UploadPartRequest req{};
+    req.SetPartNumber(part_number_);
+    req.SetUploadId(upload_id_);
+
+    auto sync_result_callback = [](const Aws::S3::Model::UploadPartRequest& 
request,
+                                   std::shared_ptr<UploadState> state,
+                                   int32_t part_number,
+                                   Aws::S3::Model::UploadPartOutcome outcome) {
+      if (!outcome.IsSuccess()) {
+        return UploadPartError(request, outcome);
+      } else {
+        AddCompletedPart(state, part_number, outcome.GetResult());
+      }
+
+      return Status::OK();
+    };
+
+    auto async_result_callback = [](const Aws::S3::Model::UploadPartRequest& 
request,
+                                    std::shared_ptr<UploadState> state,
+                                    int32_t part_number,
+                                    Aws::S3::Model::UploadPartOutcome outcome) 
{
+      HandleUploadPartOutcome(state, part_number, request, 
outcome.GetResult());
+      return Status::OK();
+    };
+
+    return Upload<Aws::S3::Model::UploadPartRequest, 
Aws::S3::Model::UploadPartOutcome>(
+        std::move(req), std::move(sync_result_callback), 
std::move(async_result_callback),
+        data, nbytes, std::move(owned_buffer));
+  }
+
+  static void HandleUploadUsingSingleRequestOutcome(
+      const std::shared_ptr<UploadState>& state, const 
S3Model::PutObjectRequest& req,
+      const Result<S3Model::PutObjectOutcome>& result) {
+    std::unique_lock<std::mutex> lock(state->mutex);
+    if (!result.ok()) {
+      state->status &= result.status();
+    } else {
+      const auto& outcome = *result;
+      if (!outcome.IsSuccess()) {
+        state->status &= UploadError(req, outcome);
+      }
+    }
+
+    state->pending_uploads_completed.MarkFinished(state->status);

Review Comment:
   Based on https://github.com/apache/arrow/pull/41876/files, we probably want 
to unlock the mutex before marking the future finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to