OliLay commented on code in PR #43096:
URL: https://github.com/apache/arrow/pull/43096#discussion_r1724546848
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1123,110 @@ class ObjectAppendStream final : public
io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the
destructor.
return Status::OK();
}
- return CommitBlockList(block_blob_client_, block_ids_,
commit_block_list_options_);
+
+ Future<> pending_blocks_completed;
+ {
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ pending_blocks_completed = upload_state_->pending_blocks_completed;
+ }
+
+ RETURN_NOT_OK(pending_blocks_completed.status());
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+ commit_block_list_options_);
}
- private:
- Status DoAppend(const void* data, int64_t nbytes,
- std::shared_ptr<Buffer> owned_buffer = nullptr) {
- RETURN_NOT_OK(CheckClosed("append"));
- auto append_data = reinterpret_cast<const uint8_t*>(data);
- Core::IO::MemoryBodyStream block_content(append_data, nbytes);
- if (block_content.Length() == 0) {
+ Future<> FlushAsync() {
+ RETURN_NOT_OK(CheckClosed("flush async"));
+ if (!initialised_) {
+ // If the stream has not been successfully initialized then there is
nothing to
+ // flush. This also avoids some unhandled errors when flushing in the
destructor.
return Status::OK();
}
- const auto n_block_ids = block_ids_.size();
+ Future<> pending_blocks_completed;
+ {
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ pending_blocks_completed = upload_state_->pending_blocks_completed;
+ }
+
+ return pending_blocks_completed.Then([self = Self()] {
+ std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
+ return CommitBlockList(self->block_blob_client_,
self->upload_state_->block_ids,
+ self->commit_block_list_options_);
+ });
+ }
+
+ private:
+ Status AppendCurrentBlock() {
+ ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+ current_block_.reset();
+ current_block_size_ = 0;
+ return AppendBlock(buf);
+ }
+
+ Status DoWrite(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ if (closed_) {
+ return Status::Invalid("Operation on closed stream");
+ }
+
+ const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
+ auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
+ data_ptr += offset;
+ nbytes -= offset;
+ pos_ += offset;
+ content_length_ += offset;
+ };
+
+ // Handle case where we have some bytes buffered from prior calls.
+ if (current_block_size_ > 0) {
+ // Try to fill current buffer
+ const int64_t to_copy =
+ std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
+ RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
+ current_block_size_ += to_copy;
+ advance_ptr(to_copy);
+
+ // If buffer isn't full, break
+ if (current_block_size_ < kBlockUploadSizeBytes) {
+ return Status::OK();
+ }
+
+ // Upload current buffer
+ RETURN_NOT_OK(AppendCurrentBlock());
+ }
+
+ // We can upload chunks without copying them into a buffer
+ while (nbytes >= kMaxBlockSizeBytes) {
+ RETURN_NOT_OK(AppendBlock(data_ptr, kMaxBlockSizeBytes));
+ advance_ptr(kMaxBlockSizeBytes);
+ }
Review Comment:
Definitely, that was a bit too naive from my side. I think your suggestion
nicely fixes this.
[aea75ff](https://github.com/apache/arrow/pull/43096/commits/aea75ffcc38b484b9d960939d299b82f17b6b9d7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]