felipecrv commented on code in PR #43096:
URL: https://github.com/apache/arrow/pull/43096#discussion_r1697002110


##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1113,95 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    auto fut = FlushAsync();
+    RETURN_NOT_OK(fut.status());
+
+    std::unique_lock<std::mutex> lock(upload_state_->mutex);
+    return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+                           commit_block_list_options_);

Review Comment:
   This code acquires the lock on `upload_state_->block_ids` twice. Isn't there 
a risk of `pending_blocks_completed` going from completed to not-completed when 
the lock is released right before the new acquisition?
   
   This is another case of tiny functions obfuscating the logic.
   ```suggestion
       std::unique_lock<std::mutex> lock(upload_state_->mutex);
       RETURN_NOT_OK(upload_state_->pending_blocks_completed.status());
   
       return CommitBlockList(block_blob_client_, upload_state_->block_ids,
                              commit_block_list_options_);
   ```



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1093,36 +1215,103 @@ class ObjectAppendStream final : public 
io::OutputStream {
     new_block_id.insert(0, required_padding_digits, '0');
     // There is a small risk when appending to a blob created by another 
client that
     // `new_block_id` may overlapping with an existing block id. Adding the 
`-arrow`
-    // suffix significantly reduces the risk, but does not 100% eliminate it. 
For example
-    // if the blob was previously created with one block, with id 
`00001-arrow` then the
-    // next block we append will conflict with that, and cause corruption.
+    // suffix significantly reduces the risk, but does not 100% eliminate it. 
For
+    // example if the blob was previously created with one block, with id 
`00001-arrow`
+    // then the next block we append will conflict with that, and cause 
corruption.
     new_block_id += "-arrow";
     new_block_id = Core::Convert::Base64Encode(
         std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));
 
-    try {
-      block_blob_client_->StageBlock(new_block_id, block_content);
-    } catch (const Storage::StorageException& exception) {
-      return ExceptionToStatus(
-          exception, "StageBlock failed for '", block_blob_client_->GetUrl(),
-          "' new_block_id: '", new_block_id,
-          "'. Staging new blocks is fundamental to streaming writes to blob 
storage.");
+    upload_state_->block_ids.push_back(new_block_id);
+
+    // We only use the future if we have background writes enabled. Without 
background
+    // writes the future is initialized as finished and not mutated any more.
+    if (background_writes_ && upload_state_->blocks_in_progress++ == 0) {
+      upload_state_->pending_blocks_completed = Future<>::Make();
     }
-    block_ids_.push_back(new_block_id);
-    pos_ += nbytes;
-    content_length_ += nbytes;
+
+    return new_block_id;
+  }
+
+  Status AppendBlock(const void* data, int64_t nbytes,
+                     std::shared_ptr<Buffer> owned_buffer = nullptr) {
+    RETURN_NOT_OK(CheckClosed("append"));
+
+    if (nbytes == 0) {
+      return Status::OK();
+    }
+
+    const auto block_id = CreateBlock();
+
+    if (background_writes_) {
+      if (owned_buffer == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, 
io_context_.pool()));
+        memcpy(owned_buffer->mutable_data(), data, nbytes);
+      } else {
+        DCHECK_EQ(data, owned_buffer->data());
+        DCHECK_EQ(nbytes, owned_buffer->size());
+      }
+
+      // The closure keeps the buffer and the upload state alive
+      auto deferred = [owned_buffer, block_id, block_blob_client = 
block_blob_client_,
+                       state = upload_state_]() mutable -> Status {
+        Core::IO::MemoryBodyStream block_content(owned_buffer->data(),
+                                                 owned_buffer->size());
+
+        auto status = StageBlock(block_blob_client.get(), block_id, 
block_content);
+        HandleUploadOutcome(state, status);
+        return Status::OK();
+      };
+      RETURN_NOT_OK(io::internal::SubmitIO(io_context_, std::move(deferred)));
+    } else {
+      auto append_data = reinterpret_cast<const uint8_t*>(data);
+      Core::IO::MemoryBodyStream block_content(append_data, nbytes);
+
+      RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id, 
block_content));
+    }
+
     return Status::OK();
   }
 
+  static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
+                                  const Status& status) {
+    std::unique_lock<std::mutex> lock(state->mutex);
+    if (!status.ok()) {
+      state->status &= status;
+    }
+    // Notify completion
+    if (--state->blocks_in_progress == 0) {
+      auto fut = state->pending_blocks_completed;
+      lock.unlock();
+      fut.MarkFinished(state->status);
+    }
+  }
+
   std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
   const io::IOContext io_context_;
   const AzureLocation location_;
   int64_t content_length_ = kNoSize;
 
+  std::shared_ptr<io::BufferOutputStream> current_block_;
+  int64_t current_block_size_ = 0;
+
+  const bool background_writes_;

Review Comment:
   This can be moved to be right after `location_` -- the group of const 
members that configures the instances of this class member. Having clarity 
around what mutates and what mutex protects mutable state will be necessary to 
understand the correctness of the behavior of this class when used from 
multiple threads.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1027,16 +1052,38 @@ class ObjectAppendStream final : public 
io::OutputStream {
     return Status::OK();
   }
 
+  Status EnsureReadyToFlushFromClose() {

Review Comment:
   And considering that this is really just 3 lines, do you really need it to 
be a separate function?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1027,16 +1052,38 @@ class ObjectAppendStream final : public 
io::OutputStream {
     return Status::OK();
   }
 
+  Status EnsureReadyToFlushFromClose() {

Review Comment:
   This must `private:`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to