This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new e1e7c50101 GH-40036: [C++] Azure file system write buffering & async
writes (#43096)
e1e7c50101 is described below
commit e1e7c501019ac26c896d61fa0c129eee83da9b55
Author: Oliver Layer <[email protected]>
AuthorDate: Wed Aug 21 13:22:57 2024 +0200
GH-40036: [C++] Azure file system write buffering & async writes (#43096)
### Rationale for this change
See #40036.
### What changes are included in this PR?
Write buffering and async writes (similar to what the S3 file system does)
in the `ObjectAppendStream` for the Azure file system.
With write buffering and async writes, the input scenario creation runtime
in the tests (which uses the `ObjectAppendStream` against Azurite) decreased
from ~25s (see [here](https://github.com/apache/arrow/issues/40036)) to ~800ms:
```
[ RUN ] TestAzuriteFileSystem.OpenInputFileMixedReadVsReadAt
[ OK ] TestAzuriteFileSystem.OpenInputFileMixedReadVsReadAt (787 ms)
```
### Are these changes tested?
Added some tests with background writes enabled and disabled (some were
taken from the S3 tests). Everything changed should be covered.
### Are there any user-facing changes?
`AzureOptions` now allows for `background_writes` to be set (default:
true). No breaking changes.
### Notes
- The code in `DoWrite` is very similar to [the code in the S3
FS](https://github.com/apache/arrow/blob/edfa343eeca008513f0300924380e1b187cc976b/cpp/src/arrow/filesystem/s3fs.cc#L1753).
Maybe this could be unified? I didn't see this in the scope of the PR though.
* GitHub Issue: #40036
Lead-authored-by: Oliver Layer <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/filesystem/azurefs.cc | 276 +++++++++++++++++++++++++++----
cpp/src/arrow/filesystem/azurefs.h | 3 +
cpp/src/arrow/filesystem/azurefs_test.cc | 264 ++++++++++++++++++++++++-----
3 files changed, 471 insertions(+), 72 deletions(-)
diff --git a/cpp/src/arrow/filesystem/azurefs.cc
b/cpp/src/arrow/filesystem/azurefs.cc
index 9b3c0c0c1d..0bad856339 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -22,6 +22,7 @@
#include "arrow/filesystem/azurefs.h"
#include "arrow/filesystem/azurefs_internal.h"
+#include "arrow/io/memory.h"
// idenfity.hpp triggers -Wattributes warnings cause -Werror builds to fail,
// so disable it for this file with pragmas.
@@ -144,6 +145,9 @@ Status AzureOptions::ExtractFromUriQuery(const Uri& uri) {
blob_storage_scheme = "http";
dfs_storage_scheme = "http";
}
+ } else if (kv.first == "background_writes") {
+ ARROW_ASSIGN_OR_RAISE(background_writes,
+ ::arrow::internal::ParseBoolean(kv.second));
} else {
return Status::Invalid(
"Unexpected query parameter in Azure Blob File System URI: '",
kv.first, "'");
@@ -937,8 +941,8 @@ Status
CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
const std::vector<std::string>& block_ids,
const Blobs::CommitBlockListOptions& options) {
try {
- // CommitBlockList puts all block_ids in the latest element. That means in
the case of
- // overlapping block_ids the newly staged block ids will always replace the
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case
+ // of overlapping block_ids the newly staged block ids will always replace
the
// previously committed blocks.
//
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
block_blob_client->CommitBlockList(block_ids, options);
@@ -950,7 +954,34 @@ Status
CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
return Status::OK();
}
+Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const
std::string& id,
+ Core::IO::MemoryBodyStream& content) {
+ try {
+ block_blob_client->StageBlock(id, content);
+ } catch (const Storage::StorageException& exception) {
+ return ExceptionToStatus(
+ exception, "StageBlock failed for '", block_blob_client->GetUrl(),
+ "' new_block_id: '", id,
+ "'. Staging new blocks is fundamental to streaming writes to blob
storage.");
+ }
+
+ return Status::OK();
+}
+
+/// Writes will be buffered up to this size (in bytes) before actually
uploading them.
+static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024;
+/// The maximum size of a block in Azure Blob (as per docs).
+static constexpr int64_t kMaxBlockSizeBytes = 4UL * 1024 * 1024 * 1024;
+
+/// This output stream, similar to other arrow OutputStreams, is not
thread-safe.
class ObjectAppendStream final : public io::OutputStream {
+ private:
+ struct UploadState;
+
+ std::shared_ptr<ObjectAppendStream> Self() {
+ return std::dynamic_pointer_cast<ObjectAppendStream>(shared_from_this());
+ }
+
public:
ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation&
location,
@@ -958,7 +989,8 @@ class ObjectAppendStream final : public io::OutputStream {
const AzureOptions& options)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
- location_(location) {
+ location_(location),
+ background_writes_(options.background_writes) {
if (metadata && metadata->size() != 0) {
ArrowMetadataToCommitBlockListOptions(metadata,
commit_block_list_options_);
} else if (options.default_metadata && options.default_metadata->size() !=
0) {
@@ -1008,10 +1040,13 @@ class ObjectAppendStream final : public
io::OutputStream {
content_length_ = 0;
}
}
+
+ upload_state_ = std::make_shared<UploadState>();
+
if (content_length_ > 0) {
ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
for (auto block : block_list.CommittedBlocks) {
- block_ids_.push_back(block.Name);
+ upload_state_->block_ids.push_back(block.Name);
}
}
initialised_ = true;
@@ -1031,12 +1066,34 @@ class ObjectAppendStream final : public
io::OutputStream {
if (closed_) {
return Status::OK();
}
+
+ if (current_block_) {
+ // Upload remaining buffer
+ RETURN_NOT_OK(AppendCurrentBlock());
+ }
+
RETURN_NOT_OK(Flush());
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}
+ Future<> CloseAsync() override {
+ if (closed_) {
+ return Status::OK();
+ }
+
+ if (current_block_) {
+ // Upload remaining buffer
+ RETURN_NOT_OK(AppendCurrentBlock());
+ }
+
+ return FlushAsync().Then([self = Self()]() {
+ self->block_blob_client_ = nullptr;
+ self->closed_ = true;
+ });
+ }
+
bool closed() const override { return closed_; }
Status CheckClosed(const char* action) const {
@@ -1052,11 +1109,11 @@ class ObjectAppendStream final : public
io::OutputStream {
}
Status Write(const std::shared_ptr<Buffer>& buffer) override {
- return DoAppend(buffer->data(), buffer->size(), buffer);
+ return DoWrite(buffer->data(), buffer->size(), buffer);
}
Status Write(const void* data, int64_t nbytes) override {
- return DoAppend(data, nbytes);
+ return DoWrite(data, nbytes);
}
Status Flush() override {
@@ -1066,20 +1123,111 @@ class ObjectAppendStream final : public
io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the
destructor.
return Status::OK();
}
- return CommitBlockList(block_blob_client_, block_ids_,
commit_block_list_options_);
+
+ Future<> pending_blocks_completed;
+ {
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ pending_blocks_completed = upload_state_->pending_blocks_completed;
+ }
+
+ RETURN_NOT_OK(pending_blocks_completed.status());
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+ commit_block_list_options_);
}
- private:
- Status DoAppend(const void* data, int64_t nbytes,
- std::shared_ptr<Buffer> owned_buffer = nullptr) {
- RETURN_NOT_OK(CheckClosed("append"));
- auto append_data = reinterpret_cast<const uint8_t*>(data);
- Core::IO::MemoryBodyStream block_content(append_data, nbytes);
- if (block_content.Length() == 0) {
+ Future<> FlushAsync() {
+ RETURN_NOT_OK(CheckClosed("flush async"));
+ if (!initialised_) {
+ // If the stream has not been successfully initialized then there is
nothing to
+ // flush. This also avoids some unhandled errors when flushing in the
destructor.
return Status::OK();
}
- const auto n_block_ids = block_ids_.size();
+ Future<> pending_blocks_completed;
+ {
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ pending_blocks_completed = upload_state_->pending_blocks_completed;
+ }
+
+ return pending_blocks_completed.Then([self = Self()] {
+ std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
+ return CommitBlockList(self->block_blob_client_,
self->upload_state_->block_ids,
+ self->commit_block_list_options_);
+ });
+ }
+
+ private:
+ Status AppendCurrentBlock() {
+ ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+ current_block_.reset();
+ current_block_size_ = 0;
+ return AppendBlock(buf);
+ }
+
+ Status DoWrite(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ if (closed_) {
+ return Status::Invalid("Operation on closed stream");
+ }
+
+ const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
+ auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
+ data_ptr += offset;
+ nbytes -= offset;
+ pos_ += offset;
+ content_length_ += offset;
+ };
+
+ // Handle case where we have some bytes buffered from prior calls.
+ if (current_block_size_ > 0) {
+ // Try to fill current buffer
+ const int64_t to_copy =
+ std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
+ RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
+ current_block_size_ += to_copy;
+ advance_ptr(to_copy);
+
+ // If buffer isn't full, break
+ if (current_block_size_ < kBlockUploadSizeBytes) {
+ return Status::OK();
+ }
+
+ // Upload current buffer
+ RETURN_NOT_OK(AppendCurrentBlock());
+ }
+
+ // We can upload chunks without copying them into a buffer
+ while (nbytes >= kBlockUploadSizeBytes) {
+ const auto upload_size = std::min(nbytes, kMaxBlockSizeBytes);
+ RETURN_NOT_OK(AppendBlock(data_ptr, upload_size));
+ advance_ptr(upload_size);
+ }
+
+ // Buffer remaining bytes
+ if (nbytes > 0) {
+ current_block_size_ = nbytes;
+
+ if (current_block_ == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(
+ current_block_,
+ io::BufferOutputStream::Create(kBlockUploadSizeBytes,
io_context_.pool()));
+ } else {
+ // Re-use the allocation from before.
+ RETURN_NOT_OK(current_block_->Reset(kBlockUploadSizeBytes,
io_context_.pool()));
+ }
+
+ RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_));
+ pos_ += current_block_size_;
+ content_length_ += current_block_size_;
+ }
+
+ return Status::OK();
+ }
+
+ std::string CreateBlock() {
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ const auto n_block_ids = upload_state_->block_ids.size();
// New block ID must always be distinct from the existing block IDs.
Otherwise we
// will accidentally replace the content of existing blocks, causing
corruption.
@@ -1093,36 +1241,106 @@ class ObjectAppendStream final : public
io::OutputStream {
new_block_id.insert(0, required_padding_digits, '0');
// There is a small risk when appending to a blob created by another
client that
// `new_block_id` may overlapping with an existing block id. Adding the
`-arrow`
- // suffix significantly reduces the risk, but does not 100% eliminate it.
For example
- // if the blob was previously created with one block, with id
`00001-arrow` then the
- // next block we append will conflict with that, and cause corruption.
+ // suffix significantly reduces the risk, but does not 100% eliminate it.
For
+ // example if the blob was previously created with one block, with id
`00001-arrow`
+ // then the next block we append will conflict with that, and cause
corruption.
new_block_id += "-arrow";
new_block_id = Core::Convert::Base64Encode(
std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));
- try {
- block_blob_client_->StageBlock(new_block_id, block_content);
- } catch (const Storage::StorageException& exception) {
- return ExceptionToStatus(
- exception, "StageBlock failed for '", block_blob_client_->GetUrl(),
- "' new_block_id: '", new_block_id,
- "'. Staging new blocks is fundamental to streaming writes to blob
storage.");
+ upload_state_->block_ids.push_back(new_block_id);
+
+ // We only use the future if we have background writes enabled. Without
background
+ // writes the future is initialized as finished and not mutated any more.
+ if (background_writes_ && upload_state_->blocks_in_progress++ == 0) {
+ upload_state_->pending_blocks_completed = Future<>::Make();
}
- block_ids_.push_back(new_block_id);
- pos_ += nbytes;
- content_length_ += nbytes;
+
+ return new_block_id;
+ }
+
+ Status AppendBlock(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+
+ if (nbytes == 0) {
+ return Status::OK();
+ }
+
+ const auto block_id = CreateBlock();
+
+ if (background_writes_) {
+ if (owned_buffer == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes,
io_context_.pool()));
+ memcpy(owned_buffer->mutable_data(), data, nbytes);
+ } else {
+ DCHECK_EQ(data, owned_buffer->data());
+ DCHECK_EQ(nbytes, owned_buffer->size());
+ }
+
+ // The closure keeps the buffer and the upload state alive
+ auto deferred = [owned_buffer, block_id, block_blob_client =
block_blob_client_,
+ state = upload_state_]() mutable -> Status {
+ Core::IO::MemoryBodyStream block_content(owned_buffer->data(),
+ owned_buffer->size());
+
+ auto status = StageBlock(block_blob_client.get(), block_id,
block_content);
+ HandleUploadOutcome(state, status);
+ return Status::OK();
+ };
+ RETURN_NOT_OK(io::internal::SubmitIO(io_context_, std::move(deferred)));
+ } else {
+ auto append_data = reinterpret_cast<const uint8_t*>(data);
+ Core::IO::MemoryBodyStream block_content(append_data, nbytes);
+
+ RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id,
block_content));
+ }
+
return Status::OK();
}
+ Status AppendBlock(std::shared_ptr<Buffer> buffer) {
+ return AppendBlock(buffer->data(), buffer->size(), buffer);
+ }
+
+ static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
+ const Status& status) {
+ std::unique_lock<std::mutex> lock(state->mutex);
+ if (!status.ok()) {
+ state->status &= status;
+ }
+ // Notify completion
+ if (--state->blocks_in_progress == 0) {
+ auto fut = state->pending_blocks_completed;
+ lock.unlock();
+ fut.MarkFinished(state->status);
+ }
+ }
+
std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;
+ const bool background_writes_;
int64_t content_length_ = kNoSize;
+ std::shared_ptr<io::BufferOutputStream> current_block_;
+ int64_t current_block_size_ = 0;
+
bool closed_ = false;
bool initialised_ = false;
int64_t pos_ = 0;
- std::vector<std::string> block_ids_;
+
+ // This struct is kept alive through background writes to avoid problems
+ // in the completion handler.
+ struct UploadState {
+ std::mutex mutex;
+ std::vector<std::string> block_ids;
+ int64_t blocks_in_progress = 0;
+ Status status;
+ Future<> pending_blocks_completed = Future<>::MakeFinished(Status::OK());
+ };
+ std::shared_ptr<UploadState> upload_state_;
+
Blobs::CommitBlockListOptions commit_block_list_options_;
};
diff --git a/cpp/src/arrow/filesystem/azurefs.h
b/cpp/src/arrow/filesystem/azurefs.h
index 072b061eeb..ebbe00c4ee 100644
--- a/cpp/src/arrow/filesystem/azurefs.h
+++ b/cpp/src/arrow/filesystem/azurefs.h
@@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions {
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr<const KeyValueMetadata> default_metadata;
+ /// Whether OutputStream writes will be issued in the background, without
blocking.
+ bool background_writes = true;
+
private:
enum class CredentialKind {
kDefault,
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc
b/cpp/src/arrow/filesystem/azurefs_test.cc
index 5ff241b17f..9d437d1f83 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -39,6 +39,7 @@
#include <memory>
#include <random>
#include <string>
+#include <vector>
#include <gmock/gmock-matchers.h>
#include <gmock/gmock-more-matchers.h>
@@ -53,6 +54,7 @@
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
+#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
@@ -566,6 +568,7 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme);
ASSERT_EQ(options.credential_kind_,
AzureOptions::CredentialKind::kDefault);
ASSERT_EQ(path, "container/dir/blob");
+ ASSERT_EQ(options.background_writes, true);
}
void TestFromUriDfsStorage() {
@@ -582,6 +585,7 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme);
ASSERT_EQ(options.credential_kind_,
AzureOptions::CredentialKind::kDefault);
ASSERT_EQ(path, "file_system/dir/file");
+ ASSERT_EQ(options.background_writes, true);
}
void TestFromUriAbfs() {
@@ -597,6 +601,7 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, "https");
ASSERT_EQ(options.credential_kind_,
AzureOptions::CredentialKind::kStorageSharedKey);
ASSERT_EQ(path, "container/dir/blob");
+ ASSERT_EQ(options.background_writes, true);
}
void TestFromUriAbfss() {
@@ -612,6 +617,7 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, "https");
ASSERT_EQ(options.credential_kind_,
AzureOptions::CredentialKind::kStorageSharedKey);
ASSERT_EQ(path, "container/dir/blob");
+ ASSERT_EQ(options.background_writes, true);
}
void TestFromUriEnableTls() {
@@ -628,6 +634,17 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, "http");
ASSERT_EQ(options.credential_kind_,
AzureOptions::CredentialKind::kStorageSharedKey);
ASSERT_EQ(path, "container/dir/blob");
+ ASSERT_EQ(options.background_writes, true);
+ }
+
+ void TestFromUriDisableBackgroundWrites() {
+ std::string path;
+ ASSERT_OK_AND_ASSIGN(auto options,
+ AzureOptions::FromUri(
+
"abfs://account:[email protected]:10000/container/dir/blob?"
+ "background_writes=false",
+ &path));
+ ASSERT_EQ(options.background_writes, false);
}
void TestFromUriCredentialDefault() {
@@ -773,6 +790,9 @@ TEST_F(TestAzureOptions, FromUriDfsStorage) {
TestFromUriDfsStorage(); }
TEST_F(TestAzureOptions, FromUriAbfs) { TestFromUriAbfs(); }
TEST_F(TestAzureOptions, FromUriAbfss) { TestFromUriAbfss(); }
TEST_F(TestAzureOptions, FromUriEnableTls) { TestFromUriEnableTls(); }
+TEST_F(TestAzureOptions, FromUriDisableBackgroundWrites) {
+ TestFromUriDisableBackgroundWrites();
+}
TEST_F(TestAzureOptions, FromUriCredentialDefault) {
TestFromUriCredentialDefault(); }
TEST_F(TestAzureOptions, FromUriCredentialAnonymous) {
TestFromUriCredentialAnonymous(); }
TEST_F(TestAzureOptions, FromUriCredentialStorageSharedKey) {
@@ -929,8 +949,9 @@ class TestAzureFileSystem : public ::testing::Test {
void UploadLines(const std::vector<std::string>& lines, const std::string&
path,
int total_size) {
ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
- const auto all_lines = std::accumulate(lines.begin(), lines.end(),
std::string(""));
- ASSERT_OK(output->Write(all_lines));
+ for (auto const& line : lines) {
+ ASSERT_OK(output->Write(line.data(), line.size()));
+ }
ASSERT_OK(output->Close());
}
@@ -1474,6 +1495,162 @@ class TestAzureFileSystem : public ::testing::Test {
arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File);
}
+ void AssertObjectContents(AzureFileSystem* fs, std::string_view path,
+ std::string_view expected) {
+ ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path}));
+ std::string contents;
+ std::shared_ptr<Buffer> buffer;
+ do {
+ ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+ contents.append(buffer->ToString());
+ } while (buffer->size() != 0);
+
+ EXPECT_EQ(expected, contents);
+ }
+
+ void TestOpenOutputStreamSmall() {
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+ auto data = SetUpPreexistingData();
+ const auto path = data.ContainerPath("test-write-object");
+ ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+ const std::string_view expected(PreexistingData::kLoremIpsum);
+ ASSERT_OK(output->Write(expected));
+ ASSERT_OK(output->Close());
+
+ // Verify we can read the object back.
+ AssertObjectContents(fs.get(), path, expected);
+ }
+
+ void TestOpenOutputStreamLarge() {
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+ auto data = SetUpPreexistingData();
+ const auto path = data.ContainerPath("test-write-object");
+ ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+ // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes
+ std::vector<std::int64_t> sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 1024
* 1024,
+ 2000};
+
+ std::vector<std::string> buffers{};
+ char current_char = 'A';
+ for (const auto size : sizes) {
+ buffers.emplace_back(size, current_char++);
+ }
+
+ auto expected_size = std::int64_t{0};
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ ASSERT_OK(output->Write(buffers[i]));
+ expected_size += sizes[i];
+ ASSERT_EQ(expected_size, output->Tell());
+ }
+ ASSERT_OK(output->Close());
+
+ AssertObjectContents(fs.get(), path,
+ buffers[0] + buffers[1] + buffers[2] + buffers[3]);
+ }
+
+ void TestOpenOutputStreamLargeSingleWrite() {
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+ auto data = SetUpPreexistingData();
+ const auto path = data.ContainerPath("test-write-object");
+ ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+ constexpr std::int64_t size{12 * 1024 * 1024};
+ const std::string large_string(size, 'X');
+
+ ASSERT_OK(output->Write(large_string));
+ ASSERT_EQ(size, output->Tell());
+ ASSERT_OK(output->Close());
+
+ AssertObjectContents(fs.get(), path, large_string);
+ }
+
+ void TestOpenOutputStreamCloseAsync() {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ // This false positive leak is similar to the one pinpointed in the
+ // have_false_positive_memory_leak_with_generator() comments above,
+ // though the stack trace is different. It happens when a block list
+ // is committed from a background thread.
+ //
+ // clang-format off
+ // Direct leak of 968 byte(s) in 1 object(s) allocated from:
+ // #0 calloc
+ // #1 (/lib/x86_64-linux-gnu/libxml2.so.2+0xe25a4)
+ // #2 __xmlDefaultBufferSize
+ // #3 xmlBufferCreate
+ // #4 Azure::Storage::_internal::XmlWriter::XmlWriter()
+ // #5 Azure::Storage::Blobs::_detail::BlockBlobClient::CommitBlockList
+ // #6 Azure::Storage::Blobs::BlockBlobClient::CommitBlockList
+ // #7 arrow::fs::(anonymous namespace)::CommitBlockList
+ // #8 arrow::fs::(anonymous
namespace)::ObjectAppendStream::FlushAsync()::'lambda'
+ // clang-format on
+ //
+ // TODO perhaps remove this skip once we can rely on
+ // https://github.com/Azure/azure-sdk-for-cpp/pull/5767
+ //
+ // Also note that ClickHouse has a workaround for a similar issue:
+ // https://github.com/ClickHouse/ClickHouse/pull/45796
+ if (options_.background_writes) {
+ GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync";
+ }
+#endif
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+ auto data = SetUpPreexistingData();
+ const std::string path = data.ContainerPath("test-write-object");
+ constexpr auto payload = PreexistingData::kLoremIpsum;
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+ ASSERT_OK(stream->Write(payload));
+ auto close_fut = stream->CloseAsync();
+
+ ASSERT_OK(close_fut.MoveResult());
+
+ AssertObjectContents(fs.get(), path, payload);
+ }
+
+ void TestOpenOutputStreamCloseAsyncDestructor() {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ // See above.
+ if (options_.background_writes) {
+ GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync";
+ }
+#endif
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+ auto data = SetUpPreexistingData();
+ const std::string path = data.ContainerPath("test-write-object");
+ constexpr auto payload = PreexistingData::kLoremIpsum;
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+ ASSERT_OK(stream->Write(payload));
+ // Destructor implicitly closes stream and completes the upload.
+ // Testing it doesn't matter whether flush is triggered asynchronously
+ // after CloseAsync or synchronously after stream.reset() since we're just
+ // checking that the future keeps the stream alive until completion
+ // rather than segfaulting on a dangling stream.
+ auto close_fut = stream->CloseAsync();
+ stream.reset();
+ ASSERT_OK(close_fut.MoveResult());
+
+ AssertObjectContents(fs.get(), path, payload);
+ }
+
+ void TestOpenOutputStreamDestructor() {
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+ constexpr auto* payload = "new data";
+ auto data = SetUpPreexistingData();
+ const std::string path = data.ContainerPath("test-write-object");
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+ ASSERT_OK(stream->Write(payload));
+ // Destructor implicitly closes stream and completes the multipart upload.
+ stream.reset();
+
+ AssertObjectContents(fs.get(), path, payload);
+ }
+
private:
using StringMatcher =
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>;
@@ -2704,53 +2881,27 @@ TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders)
{
ASSERT_EQ("text/plain", content_type);
}
-TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {
- auto data = SetUpPreexistingData();
- const auto path = data.ContainerPath("test-write-object");
- ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
- const std::string_view expected(PreexistingData::kLoremIpsum);
- ASSERT_OK(output->Write(expected));
- ASSERT_OK(output->Close());
-
- // Verify we can read the object back.
- ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamSmall();
+}
- std::array<char, 1024> inbuf{};
- ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data()));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {
TestOpenOutputStreamSmall(); }
- EXPECT_EQ(expected, std::string_view(inbuf.data(), size));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamLarge();
}
-TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) {
- auto data = SetUpPreexistingData();
- const auto path = data.ContainerPath("test-write-object");
- ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
- std::array<std::int64_t, 3> sizes{257 * 1024, 258 * 1024, 259 * 1024};
- std::array<std::string, 3> buffers{
- std::string(sizes[0], 'A'),
- std::string(sizes[1], 'B'),
- std::string(sizes[2], 'C'),
- };
- auto expected = std::int64_t{0};
- for (auto i = 0; i != 3; ++i) {
- ASSERT_OK(output->Write(buffers[i]));
- expected += sizes[i];
- ASSERT_EQ(expected, output->Tell());
- }
- ASSERT_OK(output->Close());
-
- // Verify we can read the object back.
- ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) {
TestOpenOutputStreamLarge(); }
- std::string contents;
- std::shared_ptr<Buffer> buffer;
- do {
- ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
- ASSERT_TRUE(buffer);
- contents.append(buffer->ToString());
- } while (buffer->size() != 0);
+TEST_F(TestAzuriteFileSystem,
OpenOutputStreamLargeSingleWriteNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamLargeSingleWrite();
+}
- EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeSingleWrite) {
+ TestOpenOutputStreamLargeSingleWrite();
}
TEST_F(TestAzuriteFileSystem, OpenOutputStreamTruncatesExistingFile) {
@@ -2820,6 +2971,33 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamClosed) {
ASSERT_RAISES(Invalid, output->Tell());
}
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsync) {
+ TestOpenOutputStreamCloseAsync();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsyncNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamCloseAsync();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructor) {
+ TestOpenOutputStreamCloseAsyncDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem,
OpenOutputStreamAsyncDestructorNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamCloseAsyncDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructor) {
+ TestOpenOutputStreamDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamDestructor();
+}
+
TEST_F(TestAzuriteFileSystem, OpenOutputStreamUri) {
auto data = SetUpPreexistingData();
const auto path = data.ContainerPath("open-output-stream-uri.txt");