This is an automated email from the ASF dual-hosted git repository.
felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 8a62f30d34 GH-40037: [C++][FS][Azure] Make attempted reads and writes
against directories fail fast (#40119)
8a62f30d34 is described below
commit 8a62f30d34a606c8edca6cfaad56846e0e7aceea
Author: Thomas Newton <[email protected]>
AuthorDate: Wed Feb 21 16:29:18 2024 +0000
GH-40037: [C++][FS][Azure] Make attempted reads and writes against
directories fail fast (#40119)
### Rationale for this change
Prevent confusion if a user attempts to read or write a directory.
### What changes are included in this PR?
- Make `ObjectAppendStream::Flush` a noop if `ObjectAppendStream::Init` has
not run successfully. This avoids an unhandled error when the destructor calls
flush.
- Check blob properties for directory marker metadata when initialising
`ObjectInputFile` or `ObjectAppendStream`.
- When initialising `ObjectAppendStream` call `GetFileInfo` if it is a flat
namespace account.
### Are these changes tested?
Add new tests `DisallowReadingOrWritingDirectoryMarkers` and
`DisallowCreatingFileAndDirectoryWithTheSameName` to cover the new fail fast
behaviour.
Also updated `WriteMetadata` to ensure that my changes to Flush didn't
break setting metadata without calling `Write` on the stream.
### Are there any user-facing changes?
Yes. Invalid read and write operations will now fail fast and gracefully.
Previously could get into a confusing state where there were files and
directories at the same path and there were some un-graceful failures.
* Closes: #40037
Authored-by: Thomas Newton <[email protected]>
Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
---
cpp/src/arrow/filesystem/azurefs.cc | 95 +++++++++++++++++++++++++-------
cpp/src/arrow/filesystem/azurefs_test.cc | 60 ++++++++++++++++++++
2 files changed, 135 insertions(+), 20 deletions(-)
diff --git a/cpp/src/arrow/filesystem/azurefs.cc
b/cpp/src/arrow/filesystem/azurefs.cc
index de7cdba245..8ae33b8818 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -347,6 +347,22 @@ bool IsContainerNotFound(const Storage::StorageException&
e) {
return false;
}
+const auto kHierarchicalNamespaceIsDirectoryMetadataKey = "hdi_isFolder";
+const auto kFlatNamespaceIsDirectoryMetadataKey = "is_directory";
+
+bool MetadataIndicatesIsDirectory(const Storage::Metadata& metadata) {
+ // Inspired by
+ //
https://github.com/Azure/azure-sdk-for-cpp/blob/12407e8bfcb9bc1aa43b253c1d0ec93bf795ae3b/sdk/storage/azure-storage-files-datalake/src/datalake_utilities.cpp#L86-L91
+ auto hierarchical_directory_metadata =
+ metadata.find(kHierarchicalNamespaceIsDirectoryMetadataKey);
+ if (hierarchical_directory_metadata != metadata.end()) {
+ return hierarchical_directory_metadata->second == "true";
+ }
+ auto flat_directory_metadata =
metadata.find(kFlatNamespaceIsDirectoryMetadataKey);
+ return flat_directory_metadata != metadata.end() &&
+ flat_directory_metadata->second == "true";
+}
+
template <typename ArrowType>
std::string FormatValue(typename TypeTraits<ArrowType>::CType value) {
struct StringAppender {
@@ -512,11 +528,18 @@ class ObjectInputFile final : public io::RandomAccessFile
{
Status Init() {
if (content_length_ != kNoSize) {
+ // When the user provides the file size we don't validate that its a
file. This is
+ // only a read so its not a big deal if the user makes a mistake.
DCHECK_GE(content_length_, 0);
return Status::OK();
}
try {
+ // To open an ObjectInputFile the Blob must exist and it must not
represent
+ // a directory. Additionally we need to know the file size.
auto properties = blob_client_->GetProperties();
+ if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
+ return NotAFile(location_);
+ }
content_length_ = properties.Value.BlobSize;
metadata_ = PropertiesToMetadata(properties.Value);
return Status::OK();
@@ -698,11 +721,10 @@ class ObjectAppendStream final : public io::OutputStream {
ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation&
location,
const std::shared_ptr<const KeyValueMetadata>& metadata,
- const AzureOptions& options, int64_t size = kNoSize)
+ const AzureOptions& options)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
- location_(location),
- content_length_(size) {
+ location_(location) {
if (metadata && metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(metadata);
} else if (options.default_metadata && options.default_metadata->size() !=
0) {
@@ -716,17 +738,31 @@ class ObjectAppendStream final : public io::OutputStream {
io::internal::CloseFromDestructor(this);
}
- Status Init() {
- if (content_length_ != kNoSize) {
- DCHECK_GE(content_length_, 0);
- pos_ = content_length_;
+ Status Init(const bool truncate,
+ std::function<Status()> ensure_not_flat_namespace_directory) {
+ if (truncate) {
+ content_length_ = 0;
+ pos_ = 0;
+ // We need to create an empty file overwriting any existing file, but
+ // fail if there is an existing directory.
+ RETURN_NOT_OK(ensure_not_flat_namespace_directory());
+ // On hierarchical namespace CreateEmptyBlockBlob will fail if there is
an existing
+ // directory so we don't need to check like we do on flat namespace.
+ RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
} else {
try {
auto properties = block_blob_client_->GetProperties();
+ if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
+ return NotAFile(location_);
+ }
content_length_ = properties.Value.BlobSize;
pos_ = content_length_;
} catch (const Storage::StorageException& exception) {
if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
+ // No file exists but on flat namespace its possible there is a
directory
+ // marker or an implied directory. Ensure there is no directory
before starting
+ // a new empty file.
+ RETURN_NOT_OK(ensure_not_flat_namespace_directory());
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
} else {
return ExceptionToStatus(
@@ -743,6 +779,7 @@ class ObjectAppendStream final : public io::OutputStream {
block_ids_.push_back(block.Name);
}
}
+ initialised_ = true;
return Status::OK();
}
@@ -789,6 +826,11 @@ class ObjectAppendStream final : public io::OutputStream {
Status Flush() override {
RETURN_NOT_OK(CheckClosed("flush"));
+ if (!initialised_) {
+ // If the stream has not been successfully initialized then there is
nothing to
+ // flush. This also avoids some unhandled errors when flushing in the
destructor.
+ return Status::OK();
+ }
return CommitBlockList(block_blob_client_, block_ids_, metadata_);
}
@@ -840,10 +882,11 @@ class ObjectAppendStream final : public io::OutputStream {
std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;
+ int64_t content_length_ = kNoSize;
bool closed_ = false;
+ bool initialised_ = false;
int64_t pos_ = 0;
- int64_t content_length_ = kNoSize;
std::vector<std::string> block_ids_;
Storage::Metadata metadata_;
};
@@ -1666,20 +1709,32 @@ class AzureFileSystem::Impl {
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
+ const auto blob_container_client =
GetBlobContainerClient(location.container);
auto block_blob_client = std::make_shared<Blobs::BlockBlobClient>(
- blob_service_client_->GetBlobContainerClient(location.container)
- .GetBlockBlobClient(location.path));
+ blob_container_client.GetBlockBlobClient(location.path));
+
+ auto ensure_not_flat_namespace_directory = [this, location,
+ blob_container_client]() ->
Status {
+ ARROW_ASSIGN_OR_RAISE(
+ auto hns_support,
+
HierarchicalNamespaceSupport(GetFileSystemClient(location.container)));
+ if (hns_support == HNSSupport::kDisabled) {
+ // Flat namespace so we need to GetFileInfo in-case its a directory.
+ ARROW_ASSIGN_OR_RAISE(auto status, GetFileInfo(blob_container_client,
location))
+ if (status.type() == FileType::Directory) {
+ return NotAFile(location);
+ }
+ }
+ // kContainerNotFound - it doesn't exist, so no need to check if its a
directory.
+ // kEnabled - hierarchical namespace so Azure APIs will fail if its a
directory. We
+ // don't need to explicitly check.
+ return Status::OK();
+ };
std::shared_ptr<ObjectAppendStream> stream;
- if (truncate) {
- RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client));
- stream = std::make_shared<ObjectAppendStream>(block_blob_client,
fs->io_context(),
- location, metadata,
options_, 0);
- } else {
- stream = std::make_shared<ObjectAppendStream>(block_blob_client,
fs->io_context(),
- location, metadata,
options_);
- }
- RETURN_NOT_OK(stream->Init());
+ stream = std::make_shared<ObjectAppendStream>(block_blob_client,
fs->io_context(),
+ location, metadata,
options_);
+ RETURN_NOT_OK(stream->Init(truncate, ensure_not_flat_namespace_directory));
return stream;
}
@@ -1694,7 +1749,7 @@ class AzureFileSystem::Impl {
// on directory marker blobs.
//
https://github.com/fsspec/adlfs/blob/32132c4094350fca2680155a5c236f2e9f991ba5/adlfs/spec.py#L855-L870
Blobs::UploadBlockBlobFromOptions blob_options;
- blob_options.Metadata.emplace("is_directory", "true");
+ blob_options.Metadata.emplace(kFlatNamespaceIsDirectoryMetadataKey,
"true");
block_blob_client.UploadFrom(nullptr, 0, blob_options);
}
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc
b/cpp/src/arrow/filesystem/azurefs_test.cc
index 7f5cd247a8..f21876f03c 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -838,6 +838,41 @@ class TestAzureFileSystem : public ::testing::Test {
AssertFileInfo(fs(), subdir3, FileType::Directory);
}
+ void TestDisallowReadingOrWritingDirectoryMarkers() {
+ auto data = SetUpPreexistingData();
+ auto directory_path = data.Path("directory");
+
+ ASSERT_OK(fs()->CreateDir(directory_path));
+ ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path));
+ ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path));
+ ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path));
+
+ auto directory_path_with_slash = directory_path + "/";
+ ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path_with_slash));
+ ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path_with_slash));
+ ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path_with_slash));
+ }
+
+ void TestDisallowCreatingFileAndDirectoryWithTheSameName() {
+ auto data = SetUpPreexistingData();
+ auto path1 = data.Path("directory1");
+ ASSERT_OK(fs()->CreateDir(path1));
+ ASSERT_RAISES(IOError, fs()->OpenOutputStream(path1));
+ ASSERT_RAISES(IOError, fs()->OpenAppendStream(path1));
+ AssertFileInfo(fs(), path1, FileType::Directory);
+
+ auto path2 = data.Path("directory2");
+ ASSERT_OK(fs()->OpenOutputStream(path2));
+ // CreateDir returns OK even if there is already a file or directory at
this
+ // location. Whether or not this is the desired behaviour is debatable.
+ ASSERT_OK(fs()->CreateDir(path2));
+ AssertFileInfo(fs(), path2, FileType::File);
+ }
+
+ void TestOpenOutputStreamWithMissingContainer() {
+ ASSERT_RAISES(IOError, fs()->OpenOutputStream("not-a-container/file", {}));
+ }
+
void TestDeleteDirSuccessEmpty() {
if (HasSubmitBatchBug()) {
GTEST_SKIP() << kSubmitBatchBugMessage;
@@ -1665,6 +1700,19 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios,
CreateDirOnMissingContainer) {
this->TestCreateDirOnMissingContainer();
}
+TYPED_TEST(TestAzureFileSystemOnAllScenarios,
DisallowReadingOrWritingDirectoryMarkers) {
+ this->TestDisallowReadingOrWritingDirectoryMarkers();
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllScenarios,
+ DisallowCreatingFileAndDirectoryWithTheSameName) {
+ this->TestDisallowCreatingFileAndDirectoryWithTheSameName();
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllScenarios,
OpenOutputStreamWithMissingContainer) {
+ this->TestOpenOutputStreamWithMissingContainer();
+}
+
TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) {
this->TestDeleteDirSuccessEmpty();
}
@@ -2232,6 +2280,18 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
.Value.Metadata;
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")},
blob_metadata);
+
+ // Metadata can be written without writing any data.
+ ASSERT_OK_AND_ASSIGN(
+ output, fs_with_defaults->OpenAppendStream(
+ full_path, /*metadata=*/arrow::key_value_metadata({{"bar",
"baz"}})));
+ ASSERT_OK(output->Close());
+ blob_metadata =
blob_service_client_->GetBlobContainerClient(data.container_name)
+ .GetBlockBlobClient(blob_path)
+ .GetProperties()
+ .Value.Metadata;
+ // Defaults are overwritten and not merged.
+ EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")},
blob_metadata);
}
TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {