This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new e1e7c50101 GH-40036: [C++] Azure file system write buffering & async 
writes (#43096)
e1e7c50101 is described below

commit e1e7c501019ac26c896d61fa0c129eee83da9b55
Author: Oliver Layer <[email protected]>
AuthorDate: Wed Aug 21 13:22:57 2024 +0200

    GH-40036: [C++] Azure file system write buffering & async writes (#43096)
    
    ### Rationale for this change
    
    See #40036.
    
    ### What changes are included in this PR?
    
    Write buffering and async writes (similar to what the S3 file system does) 
in the `ObjectAppendStream` for the Azure file system.
    
    With write buffering and async writes, the input scenario creation runtime 
in the tests (which uses the `ObjectAppendStream` against Azurite) decreased 
from ~25s (see [here](https://github.com/apache/arrow/issues/40036)) to ~800ms:
    ```
    [ RUN      ] TestAzuriteFileSystem.OpenInputFileMixedReadVsReadAt
    [       OK ] TestAzuriteFileSystem.OpenInputFileMixedReadVsReadAt (787 ms)
    ```
    
    ### Are these changes tested?
    Added some tests with background writes enabled and disabled (some were 
taken from the S3 tests). Everything changed should be covered.
    
    ### Are there any user-facing changes?
    `AzureOptions` now allows for `background_writes` to be set (default: 
true). No breaking changes.
    
    ### Notes
    
    - The code in `DoWrite` is very similar to [the code in the S3 
FS](https://github.com/apache/arrow/blob/edfa343eeca008513f0300924380e1b187cc976b/cpp/src/arrow/filesystem/s3fs.cc#L1753).
 Maybe this could be unified? I didn't see this in the scope of the PR though.
    * GitHub Issue: #40036
    
    Lead-authored-by: Oliver Layer <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/filesystem/azurefs.cc      | 276 +++++++++++++++++++++++++++----
 cpp/src/arrow/filesystem/azurefs.h       |   3 +
 cpp/src/arrow/filesystem/azurefs_test.cc | 264 ++++++++++++++++++++++++-----
 3 files changed, 471 insertions(+), 72 deletions(-)

diff --git a/cpp/src/arrow/filesystem/azurefs.cc 
b/cpp/src/arrow/filesystem/azurefs.cc
index 9b3c0c0c1d..0bad856339 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -22,6 +22,7 @@
 
 #include "arrow/filesystem/azurefs.h"
 #include "arrow/filesystem/azurefs_internal.h"
+#include "arrow/io/memory.h"
 
 // idenfity.hpp triggers -Wattributes warnings cause -Werror builds to fail,
 // so disable it for this file with pragmas.
@@ -144,6 +145,9 @@ Status AzureOptions::ExtractFromUriQuery(const Uri& uri) {
         blob_storage_scheme = "http";
         dfs_storage_scheme = "http";
       }
+    } else if (kv.first == "background_writes") {
+      ARROW_ASSIGN_OR_RAISE(background_writes,
+                            ::arrow::internal::ParseBoolean(kv.second));
     } else {
       return Status::Invalid(
           "Unexpected query parameter in Azure Blob File System URI: '", 
kv.first, "'");
@@ -937,8 +941,8 @@ Status 
CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
                        const std::vector<std::string>& block_ids,
                        const Blobs::CommitBlockListOptions& options) {
   try {
-    // CommitBlockList puts all block_ids in the latest element. That means in 
the case of
-    // overlapping block_ids the newly staged block ids will always replace the
+    // CommitBlockList puts all block_ids in the latest element. That means in 
the case
+    // of overlapping block_ids the newly staged block ids will always replace 
the
     // previously committed blocks.
     // 
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
     block_blob_client->CommitBlockList(block_ids, options);
@@ -950,7 +954,34 @@ Status 
CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
   return Status::OK();
 }
 
+Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const 
std::string& id,
+                  Core::IO::MemoryBodyStream& content) {
+  try {
+    block_blob_client->StageBlock(id, content);
+  } catch (const Storage::StorageException& exception) {
+    return ExceptionToStatus(
+        exception, "StageBlock failed for '", block_blob_client->GetUrl(),
+        "' new_block_id: '", id,
+        "'. Staging new blocks is fundamental to streaming writes to blob 
storage.");
+  }
+
+  return Status::OK();
+}
+
+/// Writes will be buffered up to this size (in bytes) before actually 
uploading them.
+static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024;
+/// The maximum size of a block in Azure Blob (as per docs).
+static constexpr int64_t kMaxBlockSizeBytes = 4UL * 1024 * 1024 * 1024;
+
+/// This output stream, similar to other arrow OutputStreams, is not 
thread-safe.
 class ObjectAppendStream final : public io::OutputStream {
+ private:
+  struct UploadState;
+
+  std::shared_ptr<ObjectAppendStream> Self() {
+    return std::dynamic_pointer_cast<ObjectAppendStream>(shared_from_this());
+  }
+
  public:
   ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
                      const io::IOContext& io_context, const AzureLocation& 
location,
@@ -958,7 +989,8 @@ class ObjectAppendStream final : public io::OutputStream {
                      const AzureOptions& options)
       : block_blob_client_(std::move(block_blob_client)),
         io_context_(io_context),
-        location_(location) {
+        location_(location),
+        background_writes_(options.background_writes) {
     if (metadata && metadata->size() != 0) {
       ArrowMetadataToCommitBlockListOptions(metadata, 
commit_block_list_options_);
     } else if (options.default_metadata && options.default_metadata->size() != 
0) {
@@ -1008,10 +1040,13 @@ class ObjectAppendStream final : public 
io::OutputStream {
         content_length_ = 0;
       }
     }
+
+    upload_state_ = std::make_shared<UploadState>();
+
     if (content_length_ > 0) {
       ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
       for (auto block : block_list.CommittedBlocks) {
-        block_ids_.push_back(block.Name);
+        upload_state_->block_ids.push_back(block.Name);
       }
     }
     initialised_ = true;
@@ -1031,12 +1066,34 @@ class ObjectAppendStream final : public 
io::OutputStream {
     if (closed_) {
       return Status::OK();
     }
+
+    if (current_block_) {
+      // Upload remaining buffer
+      RETURN_NOT_OK(AppendCurrentBlock());
+    }
+
     RETURN_NOT_OK(Flush());
     block_blob_client_ = nullptr;
     closed_ = true;
     return Status::OK();
   }
 
+  Future<> CloseAsync() override {
+    if (closed_) {
+      return Status::OK();
+    }
+
+    if (current_block_) {
+      // Upload remaining buffer
+      RETURN_NOT_OK(AppendCurrentBlock());
+    }
+
+    return FlushAsync().Then([self = Self()]() {
+      self->block_blob_client_ = nullptr;
+      self->closed_ = true;
+    });
+  }
+
   bool closed() const override { return closed_; }
 
   Status CheckClosed(const char* action) const {
@@ -1052,11 +1109,11 @@ class ObjectAppendStream final : public 
io::OutputStream {
   }
 
   Status Write(const std::shared_ptr<Buffer>& buffer) override {
-    return DoAppend(buffer->data(), buffer->size(), buffer);
+    return DoWrite(buffer->data(), buffer->size(), buffer);
   }
 
   Status Write(const void* data, int64_t nbytes) override {
-    return DoAppend(data, nbytes);
+    return DoWrite(data, nbytes);
   }
 
   Status Flush() override {
@@ -1066,20 +1123,111 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    RETURN_NOT_OK(pending_blocks_completed.status());
+    std::unique_lock<std::mutex> lock(upload_state_->mutex);
+    return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+                           commit_block_list_options_);
   }
 
- private:
-  Status DoAppend(const void* data, int64_t nbytes,
-                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
-    RETURN_NOT_OK(CheckClosed("append"));
-    auto append_data = reinterpret_cast<const uint8_t*>(data);
-    Core::IO::MemoryBodyStream block_content(append_data, nbytes);
-    if (block_content.Length() == 0) {
+  Future<> FlushAsync() {
+    RETURN_NOT_OK(CheckClosed("flush async"));
+    if (!initialised_) {
+      // If the stream has not been successfully initialized then there is 
nothing to
+      // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
 
-    const auto n_block_ids = block_ids_.size();
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    return pending_blocks_completed.Then([self = Self()] {
+      std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
+      return CommitBlockList(self->block_blob_client_, 
self->upload_state_->block_ids,
+                             self->commit_block_list_options_);
+    });
+  }
+
+ private:
+  Status AppendCurrentBlock() {
+    ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+    current_block_.reset();
+    current_block_size_ = 0;
+    return AppendBlock(buf);
+  }
+
+  Status DoWrite(const void* data, int64_t nbytes,
+                 std::shared_ptr<Buffer> owned_buffer = nullptr) {
+    if (closed_) {
+      return Status::Invalid("Operation on closed stream");
+    }
+
+    const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
+    auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
+      data_ptr += offset;
+      nbytes -= offset;
+      pos_ += offset;
+      content_length_ += offset;
+    };
+
+    // Handle case where we have some bytes buffered from prior calls.
+    if (current_block_size_ > 0) {
+      // Try to fill current buffer
+      const int64_t to_copy =
+          std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
+      RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
+      current_block_size_ += to_copy;
+      advance_ptr(to_copy);
+
+      // If buffer isn't full, break
+      if (current_block_size_ < kBlockUploadSizeBytes) {
+        return Status::OK();
+      }
+
+      // Upload current buffer
+      RETURN_NOT_OK(AppendCurrentBlock());
+    }
+
+    // We can upload chunks without copying them into a buffer
+    while (nbytes >= kBlockUploadSizeBytes) {
+      const auto upload_size = std::min(nbytes, kMaxBlockSizeBytes);
+      RETURN_NOT_OK(AppendBlock(data_ptr, upload_size));
+      advance_ptr(upload_size);
+    }
+
+    // Buffer remaining bytes
+    if (nbytes > 0) {
+      current_block_size_ = nbytes;
+
+      if (current_block_ == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(
+            current_block_,
+            io::BufferOutputStream::Create(kBlockUploadSizeBytes, 
io_context_.pool()));
+      } else {
+        // Re-use the allocation from before.
+        RETURN_NOT_OK(current_block_->Reset(kBlockUploadSizeBytes, 
io_context_.pool()));
+      }
+
+      RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_));
+      pos_ += current_block_size_;
+      content_length_ += current_block_size_;
+    }
+
+    return Status::OK();
+  }
+
+  std::string CreateBlock() {
+    std::unique_lock<std::mutex> lock(upload_state_->mutex);
+    const auto n_block_ids = upload_state_->block_ids.size();
 
     // New block ID must always be distinct from the existing block IDs. 
Otherwise we
     // will accidentally replace the content of existing blocks, causing 
corruption.
@@ -1093,36 +1241,106 @@ class ObjectAppendStream final : public 
io::OutputStream {
     new_block_id.insert(0, required_padding_digits, '0');
     // There is a small risk when appending to a blob created by another 
client that
     // `new_block_id` may overlapping with an existing block id. Adding the 
`-arrow`
-    // suffix significantly reduces the risk, but does not 100% eliminate it. 
For example
-    // if the blob was previously created with one block, with id 
`00001-arrow` then the
-    // next block we append will conflict with that, and cause corruption.
+    // suffix significantly reduces the risk, but does not 100% eliminate it. 
For
+    // example if the blob was previously created with one block, with id 
`00001-arrow`
+    // then the next block we append will conflict with that, and cause 
corruption.
     new_block_id += "-arrow";
     new_block_id = Core::Convert::Base64Encode(
         std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));
 
-    try {
-      block_blob_client_->StageBlock(new_block_id, block_content);
-    } catch (const Storage::StorageException& exception) {
-      return ExceptionToStatus(
-          exception, "StageBlock failed for '", block_blob_client_->GetUrl(),
-          "' new_block_id: '", new_block_id,
-          "'. Staging new blocks is fundamental to streaming writes to blob 
storage.");
+    upload_state_->block_ids.push_back(new_block_id);
+
+    // We only use the future if we have background writes enabled. Without 
background
+    // writes the future is initialized as finished and not mutated any more.
+    if (background_writes_ && upload_state_->blocks_in_progress++ == 0) {
+      upload_state_->pending_blocks_completed = Future<>::Make();
     }
-    block_ids_.push_back(new_block_id);
-    pos_ += nbytes;
-    content_length_ += nbytes;
+
+    return new_block_id;
+  }
+
+  Status AppendBlock(const void* data, int64_t nbytes,
+                     std::shared_ptr<Buffer> owned_buffer = nullptr) {
+    RETURN_NOT_OK(CheckClosed("append"));
+
+    if (nbytes == 0) {
+      return Status::OK();
+    }
+
+    const auto block_id = CreateBlock();
+
+    if (background_writes_) {
+      if (owned_buffer == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, 
io_context_.pool()));
+        memcpy(owned_buffer->mutable_data(), data, nbytes);
+      } else {
+        DCHECK_EQ(data, owned_buffer->data());
+        DCHECK_EQ(nbytes, owned_buffer->size());
+      }
+
+      // The closure keeps the buffer and the upload state alive
+      auto deferred = [owned_buffer, block_id, block_blob_client = 
block_blob_client_,
+                       state = upload_state_]() mutable -> Status {
+        Core::IO::MemoryBodyStream block_content(owned_buffer->data(),
+                                                 owned_buffer->size());
+
+        auto status = StageBlock(block_blob_client.get(), block_id, 
block_content);
+        HandleUploadOutcome(state, status);
+        return Status::OK();
+      };
+      RETURN_NOT_OK(io::internal::SubmitIO(io_context_, std::move(deferred)));
+    } else {
+      auto append_data = reinterpret_cast<const uint8_t*>(data);
+      Core::IO::MemoryBodyStream block_content(append_data, nbytes);
+
+      RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id, 
block_content));
+    }
+
     return Status::OK();
   }
 
+  Status AppendBlock(std::shared_ptr<Buffer> buffer) {
+    return AppendBlock(buffer->data(), buffer->size(), buffer);
+  }
+
+  static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
+                                  const Status& status) {
+    std::unique_lock<std::mutex> lock(state->mutex);
+    if (!status.ok()) {
+      state->status &= status;
+    }
+    // Notify completion
+    if (--state->blocks_in_progress == 0) {
+      auto fut = state->pending_blocks_completed;
+      lock.unlock();
+      fut.MarkFinished(state->status);
+    }
+  }
+
   std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
   const io::IOContext io_context_;
   const AzureLocation location_;
+  const bool background_writes_;
   int64_t content_length_ = kNoSize;
 
+  std::shared_ptr<io::BufferOutputStream> current_block_;
+  int64_t current_block_size_ = 0;
+
   bool closed_ = false;
   bool initialised_ = false;
   int64_t pos_ = 0;
-  std::vector<std::string> block_ids_;
+
+  // This struct is kept alive through background writes to avoid problems
+  // in the completion handler.
+  struct UploadState {
+    std::mutex mutex;
+    std::vector<std::string> block_ids;
+    int64_t blocks_in_progress = 0;
+    Status status;
+    Future<> pending_blocks_completed = Future<>::MakeFinished(Status::OK());
+  };
+  std::shared_ptr<UploadState> upload_state_;
+
   Blobs::CommitBlockListOptions commit_block_list_options_;
 };
 
diff --git a/cpp/src/arrow/filesystem/azurefs.h 
b/cpp/src/arrow/filesystem/azurefs.h
index 072b061eeb..ebbe00c4ee 100644
--- a/cpp/src/arrow/filesystem/azurefs.h
+++ b/cpp/src/arrow/filesystem/azurefs.h
@@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions {
   /// This will be ignored if non-empty metadata is passed to OpenOutputStream.
   std::shared_ptr<const KeyValueMetadata> default_metadata;
 
+  /// Whether OutputStream writes will be issued in the background, without 
blocking.
+  bool background_writes = true;
+
  private:
   enum class CredentialKind {
     kDefault,
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc 
b/cpp/src/arrow/filesystem/azurefs_test.cc
index 5ff241b17f..9d437d1f83 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -39,6 +39,7 @@
 #include <memory>
 #include <random>
 #include <string>
+#include <vector>
 
 #include <gmock/gmock-matchers.h>
 #include <gmock/gmock-more-matchers.h>
@@ -53,6 +54,7 @@
 #include "arrow/status.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/util.h"
+#include "arrow/util/future.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
@@ -566,6 +568,7 @@ class TestAzureOptions : public ::testing::Test {
     ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme);
     ASSERT_EQ(options.credential_kind_, 
AzureOptions::CredentialKind::kDefault);
     ASSERT_EQ(path, "container/dir/blob");
+    ASSERT_EQ(options.background_writes, true);
   }
 
   void TestFromUriDfsStorage() {
@@ -582,6 +585,7 @@ class TestAzureOptions : public ::testing::Test {
     ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme);
     ASSERT_EQ(options.credential_kind_, 
AzureOptions::CredentialKind::kDefault);
     ASSERT_EQ(path, "file_system/dir/file");
+    ASSERT_EQ(options.background_writes, true);
   }
 
   void TestFromUriAbfs() {
@@ -597,6 +601,7 @@ class TestAzureOptions : public ::testing::Test {
     ASSERT_EQ(options.dfs_storage_scheme, "https");
     ASSERT_EQ(options.credential_kind_, 
AzureOptions::CredentialKind::kStorageSharedKey);
     ASSERT_EQ(path, "container/dir/blob");
+    ASSERT_EQ(options.background_writes, true);
   }
 
   void TestFromUriAbfss() {
@@ -612,6 +617,7 @@ class TestAzureOptions : public ::testing::Test {
     ASSERT_EQ(options.dfs_storage_scheme, "https");
     ASSERT_EQ(options.credential_kind_, 
AzureOptions::CredentialKind::kStorageSharedKey);
     ASSERT_EQ(path, "container/dir/blob");
+    ASSERT_EQ(options.background_writes, true);
   }
 
   void TestFromUriEnableTls() {
@@ -628,6 +634,17 @@ class TestAzureOptions : public ::testing::Test {
     ASSERT_EQ(options.dfs_storage_scheme, "http");
     ASSERT_EQ(options.credential_kind_, 
AzureOptions::CredentialKind::kStorageSharedKey);
     ASSERT_EQ(path, "container/dir/blob");
+    ASSERT_EQ(options.background_writes, true);
+  }
+
+  void TestFromUriDisableBackgroundWrites() {
+    std::string path;
+    ASSERT_OK_AND_ASSIGN(auto options,
+                         AzureOptions::FromUri(
+                             
"abfs://account:[email protected]:10000/container/dir/blob?"
+                             "background_writes=false",
+                             &path));
+    ASSERT_EQ(options.background_writes, false);
   }
 
   void TestFromUriCredentialDefault() {
@@ -773,6 +790,9 @@ TEST_F(TestAzureOptions, FromUriDfsStorage) { 
TestFromUriDfsStorage(); }
 TEST_F(TestAzureOptions, FromUriAbfs) { TestFromUriAbfs(); }
 TEST_F(TestAzureOptions, FromUriAbfss) { TestFromUriAbfss(); }
 TEST_F(TestAzureOptions, FromUriEnableTls) { TestFromUriEnableTls(); }
+TEST_F(TestAzureOptions, FromUriDisableBackgroundWrites) {
+  TestFromUriDisableBackgroundWrites();
+}
 TEST_F(TestAzureOptions, FromUriCredentialDefault) { 
TestFromUriCredentialDefault(); }
 TEST_F(TestAzureOptions, FromUriCredentialAnonymous) { 
TestFromUriCredentialAnonymous(); }
 TEST_F(TestAzureOptions, FromUriCredentialStorageSharedKey) {
@@ -929,8 +949,9 @@ class TestAzureFileSystem : public ::testing::Test {
   void UploadLines(const std::vector<std::string>& lines, const std::string& 
path,
                    int total_size) {
     ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
-    const auto all_lines = std::accumulate(lines.begin(), lines.end(), 
std::string(""));
-    ASSERT_OK(output->Write(all_lines));
+    for (auto const& line : lines) {
+      ASSERT_OK(output->Write(line.data(), line.size()));
+    }
     ASSERT_OK(output->Close());
   }
 
@@ -1474,6 +1495,162 @@ class TestAzureFileSystem : public ::testing::Test {
     arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File);
   }
 
+  void AssertObjectContents(AzureFileSystem* fs, std::string_view path,
+                            std::string_view expected) {
+    ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path}));
+    std::string contents;
+    std::shared_ptr<Buffer> buffer;
+    do {
+      ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+      contents.append(buffer->ToString());
+    } while (buffer->size() != 0);
+
+    EXPECT_EQ(expected, contents);
+  }
+
+  void TestOpenOutputStreamSmall() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+    const std::string_view expected(PreexistingData::kLoremIpsum);
+    ASSERT_OK(output->Write(expected));
+    ASSERT_OK(output->Close());
+
+    // Verify we can read the object back.
+    AssertObjectContents(fs.get(), path, expected);
+  }
+
+  void TestOpenOutputStreamLarge() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+    // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes
+    std::vector<std::int64_t> sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 1024 
* 1024,
+                                    2000};
+
+    std::vector<std::string> buffers{};
+    char current_char = 'A';
+    for (const auto size : sizes) {
+      buffers.emplace_back(size, current_char++);
+    }
+
+    auto expected_size = std::int64_t{0};
+    for (size_t i = 0; i < buffers.size(); ++i) {
+      ASSERT_OK(output->Write(buffers[i]));
+      expected_size += sizes[i];
+      ASSERT_EQ(expected_size, output->Tell());
+    }
+    ASSERT_OK(output->Close());
+
+    AssertObjectContents(fs.get(), path,
+                         buffers[0] + buffers[1] + buffers[2] + buffers[3]);
+  }
+
+  void TestOpenOutputStreamLargeSingleWrite() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+    constexpr std::int64_t size{12 * 1024 * 1024};
+    const std::string large_string(size, 'X');
+
+    ASSERT_OK(output->Write(large_string));
+    ASSERT_EQ(size, output->Tell());
+    ASSERT_OK(output->Close());
+
+    AssertObjectContents(fs.get(), path, large_string);
+  }
+
+  void TestOpenOutputStreamCloseAsync() {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+    // This false positive leak is similar to the one pinpointed in the
+    // have_false_positive_memory_leak_with_generator() comments above,
+    // though the stack trace is different. It happens when a block list
+    // is committed from a background thread.
+    //
+    // clang-format off
+    // Direct leak of 968 byte(s) in 1 object(s) allocated from:
+    //   #0 calloc
+    //   #1 (/lib/x86_64-linux-gnu/libxml2.so.2+0xe25a4)
+    //   #2 __xmlDefaultBufferSize
+    //   #3 xmlBufferCreate
+    //   #4 Azure::Storage::_internal::XmlWriter::XmlWriter()
+    //   #5 Azure::Storage::Blobs::_detail::BlockBlobClient::CommitBlockList
+    //   #6 Azure::Storage::Blobs::BlockBlobClient::CommitBlockList
+    //   #7 arrow::fs::(anonymous namespace)::CommitBlockList
+    //   #8 arrow::fs::(anonymous 
namespace)::ObjectAppendStream::FlushAsync()::'lambda'
+    // clang-format on
+    //
+    // TODO perhaps remove this skip once we can rely on
+    // https://github.com/Azure/azure-sdk-for-cpp/pull/5767
+    //
+    // Also note that ClickHouse has a workaround for a similar issue:
+    // https://github.com/ClickHouse/ClickHouse/pull/45796
+    if (options_.background_writes) {
+      GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync";
+    }
+#endif
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+    auto data = SetUpPreexistingData();
+    const std::string path = data.ContainerPath("test-write-object");
+    constexpr auto payload = PreexistingData::kLoremIpsum;
+
+    ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+    ASSERT_OK(stream->Write(payload));
+    auto close_fut = stream->CloseAsync();
+
+    ASSERT_OK(close_fut.MoveResult());
+
+    AssertObjectContents(fs.get(), path, payload);
+  }
+
+  void TestOpenOutputStreamCloseAsyncDestructor() {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+    // See above.
+    if (options_.background_writes) {
+      GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync";
+    }
+#endif
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+    auto data = SetUpPreexistingData();
+    const std::string path = data.ContainerPath("test-write-object");
+    constexpr auto payload = PreexistingData::kLoremIpsum;
+
+    ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+    ASSERT_OK(stream->Write(payload));
+    // Destructor implicitly closes stream and completes the upload.
+    // Testing it doesn't matter whether flush is triggered asynchronously
+    // after CloseAsync or synchronously after stream.reset() since we're just
+    // checking that the future keeps the stream alive until completion
+    // rather than segfaulting on a dangling stream.
+    auto close_fut = stream->CloseAsync();
+    stream.reset();
+    ASSERT_OK(close_fut.MoveResult());
+
+    AssertObjectContents(fs.get(), path, payload);
+  }
+
+  void TestOpenOutputStreamDestructor() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+    constexpr auto* payload = "new data";
+    auto data = SetUpPreexistingData();
+    const std::string path = data.ContainerPath("test-write-object");
+
+    ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+    ASSERT_OK(stream->Write(payload));
+    // Destructor implicitly closes stream and completes the multipart upload.
+    stream.reset();
+
+    AssertObjectContents(fs.get(), path, payload);
+  }
+
  private:
   using StringMatcher =
       
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>;
@@ -2704,53 +2881,27 @@ TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders) 
{
   ASSERT_EQ("text/plain", content_type);
 }
 
-TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {
-  auto data = SetUpPreexistingData();
-  const auto path = data.ContainerPath("test-write-object");
-  ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
-  const std::string_view expected(PreexistingData::kLoremIpsum);
-  ASSERT_OK(output->Write(expected));
-  ASSERT_OK(output->Close());
-
-  // Verify we can read the object back.
-  ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallNoBackgroundWrites) {
+  options_.background_writes = false;
+  TestOpenOutputStreamSmall();
+}
 
-  std::array<char, 1024> inbuf{};
-  ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data()));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { 
TestOpenOutputStreamSmall(); }
 
-  EXPECT_EQ(expected, std::string_view(inbuf.data(), size));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeNoBackgroundWrites) {
+  options_.background_writes = false;
+  TestOpenOutputStreamLarge();
 }
 
-TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) {
-  auto data = SetUpPreexistingData();
-  const auto path = data.ContainerPath("test-write-object");
-  ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
-  std::array<std::int64_t, 3> sizes{257 * 1024, 258 * 1024, 259 * 1024};
-  std::array<std::string, 3> buffers{
-      std::string(sizes[0], 'A'),
-      std::string(sizes[1], 'B'),
-      std::string(sizes[2], 'C'),
-  };
-  auto expected = std::int64_t{0};
-  for (auto i = 0; i != 3; ++i) {
-    ASSERT_OK(output->Write(buffers[i]));
-    expected += sizes[i];
-    ASSERT_EQ(expected, output->Tell());
-  }
-  ASSERT_OK(output->Close());
-
-  // Verify we can read the object back.
-  ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) { 
TestOpenOutputStreamLarge(); }
 
-  std::string contents;
-  std::shared_ptr<Buffer> buffer;
-  do {
-    ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
-    ASSERT_TRUE(buffer);
-    contents.append(buffer->ToString());
-  } while (buffer->size() != 0);
+TEST_F(TestAzuriteFileSystem, 
OpenOutputStreamLargeSingleWriteNoBackgroundWrites) {
+  options_.background_writes = false;
+  TestOpenOutputStreamLargeSingleWrite();
+}
 
-  EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeSingleWrite) {
+  TestOpenOutputStreamLargeSingleWrite();
 }
 
 TEST_F(TestAzuriteFileSystem, OpenOutputStreamTruncatesExistingFile) {
@@ -2820,6 +2971,33 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamClosed) {
   ASSERT_RAISES(Invalid, output->Tell());
 }
 
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsync) {
+  TestOpenOutputStreamCloseAsync();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsyncNoBackgroundWrites) {
+  options_.background_writes = false;
+  TestOpenOutputStreamCloseAsync();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructor) {
+  TestOpenOutputStreamCloseAsyncDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem, 
OpenOutputStreamAsyncDestructorNoBackgroundWrites) {
+  options_.background_writes = false;
+  TestOpenOutputStreamCloseAsyncDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructor) {
+  TestOpenOutputStreamDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorNoBackgroundWrites) {
+  options_.background_writes = false;
+  TestOpenOutputStreamDestructor();
+}
+
 TEST_F(TestAzuriteFileSystem, OpenOutputStreamUri) {
   auto data = SetUpPreexistingData();
   const auto path = data.ContainerPath("open-output-stream-uri.txt");

Reply via email to