kou commented on code in PR #43096:
URL: https://github.com/apache/arrow/pull/43096#discussion_r1716293902


##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1118,114 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    RETURN_NOT_OK(pending_blocks_completed.status());
+    std::unique_lock<std::mutex> lock(upload_state_->mutex);
+    return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+                           commit_block_list_options_);
   }
 
- private:
-  Status DoAppend(const void* data, int64_t nbytes,
-                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
-    RETURN_NOT_OK(CheckClosed("append"));
-    auto append_data = reinterpret_cast<const uint8_t*>(data);
-    Core::IO::MemoryBodyStream block_content(append_data, nbytes);
-    if (block_content.Length() == 0) {
+  Future<> FlushAsync() {
+    RETURN_NOT_OK(CheckClosed("flushAsync"));

Review Comment:
   ```suggestion
       RETURN_NOT_OK(CheckClosed("flush async"));
   ```



##########
cpp/src/arrow/filesystem/azurefs.h:
##########
@@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions {
   /// This will be ignored if non-empty metadata is passed to OpenOutputStream.
   std::shared_ptr<const KeyValueMetadata> default_metadata;
 
+  /// Whether OutputStream writes will be issued in the background, without 
blocking.
+  bool background_writes = false;

Review Comment:
   Could you add support for this in `AzureOptions::ExtractFromUriQuery()`?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1008,16 +1031,23 @@ class ObjectAppendStream final : public 
io::OutputStream {
         content_length_ = 0;
       }
     }
+
+    upload_state_ = std::make_shared<UploadState>();
+
     if (content_length_ > 0) {
       ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
       for (auto block : block_list.CommittedBlocks) {
-        block_ids_.push_back(block.Name);
+        upload_state_->block_ids.push_back(block.Name);
       }
     }
     initialised_ = true;
     return Status::OK();
   }
 
+  std::shared_ptr<ObjectAppendStream> Self() {
+    return std::dynamic_pointer_cast<ObjectAppendStream>(shared_from_this());
+  }

Review Comment:
   Can we make this `private`?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -1471,6 +1473,93 @@ class TestAzureFileSystem : public ::testing::Test {
     arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File);
   }
 
+  void AssertObjectContents(AzureFileSystem* fs, std::string_view path,
+                            std::string_view expected) {
+    ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path}));
+    std::string contents;
+    std::shared_ptr<Buffer> buffer;
+    do {
+      ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+      ASSERT_TRUE(buffer);

Review Comment:
   OK. Let's remove it. The previous `ASSERT_OK_AND_ASSIGN(buffer)` must detect 
any invalid situations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to