kou commented on code in PR #38505:
URL: https://github.com/apache/arrow/pull/38505#discussion_r1385857556
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -453,27 +457,137 @@ class ObjectInputFile final : public
io::RandomAccessFile {
class AzureFileSystem::Impl {
public:
io::IOContext io_context_;
- std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
+ std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeServiceClient>
+ datalake_service_client_;
+ std::unique_ptr<Azure::Storage::Blobs::BlobServiceClient>
blob_service_client_;
AzureOptions options_;
+ internal::HierarchicalNamespaceDetector hierarchical_namespace_;
Review Comment:
It seems that `HierarchicalNamespaceDetector` is enough simple to move to
`Impl`. (`HierarchicalNamespaceDetector::Enabled()` is the only important
method in the class.)
How about moving `HierarchicalNamespaceDetector::Enabled()` to
`Impl::IsHierarchicalNamespaceEnabled()` and removing
`HierarchicalNamespaceDetector` (or something)?
If we do it, we can make `datalake_service_client_` `std::unique_ptr`.
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -146,9 +148,8 @@ Status ValidateFilePath(const AzurePath& path) {
return Status::OK();
}
-Status ErrorToStatus(const std::string& prefix,
- const Azure::Storage::StorageException& exception) {
- return Status::IOError(prefix, " Azure Error: ", exception.what());
+bool ContainerOrBlobNotFound(const Azure::Storage::StorageException&
exception) {
Review Comment:
I can understand this naming but I feel that it doesn't add so much
information.
How about just using `IsNotFoundStatus()`, `IsNotFoundException()` or
something?
(Or we don't need to add this helper function because `exception.StatusCode
== Azure::Core::Http::HttpStatusCode::NotFound` is enough easy to understand
and straightforward.)
##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -216,23 +227,223 @@ class TestAzureFileSystem : public ::testing::Test {
void UploadLines(const std::vector<std::string>& lines, const char*
path_to_file,
int total_size) {
// TODO(GH-38333): Switch to using Azure filesystem to write once its
implemented.
- auto blob_client =
service_client_->GetBlobContainerClient(PreexistingContainerName())
- .GetBlockBlobClient(path_to_file);
+ auto blob_client =
+
blob_service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file);
std::string all_lines = std::accumulate(lines.begin(), lines.end(),
std::string(""));
blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(all_lines.data()),
total_size);
}
};
-TEST_F(TestAzureFileSystem, OpenInputStreamString) {
+class AzuriteFileSystemTest : public AzureFileSystemTest {
+ Result<AzureOptions> MakeOptions() {
+ EXPECT_THAT(GetAzuriteEnv(), NotNull());
+ ARROW_EXPECT_OK(GetAzuriteEnv()->status());
+ AzureOptions options;
+ options.backend = AzureBackend::Azurite;
+ ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
+ GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
+ return options;
+ }
+};
+
+class AzureFlatNamespaceFileSystemTest : public AzureFileSystemTest {
+ Result<AzureOptions> MakeOptions() override {
+ AzureOptions options;
+ if (char* account_name = std::getenv("AZURE_FLAT_NAMESPACE_ACCOUNT_NAME"))
{
+ char* account_key = std::getenv("AZURE_FLAT_NAMESPACE_ACCOUNT_KEY");
+ EXPECT_THAT(account_key, NotNull());
+ ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(account_name,
account_key));
+ return options;
+ }
+ return Status::Cancelled(
+ "Connection details not provided for a real flat namespace "
+ "account.");
+ }
+};
+
+class AzureHierarchicalNamespaceFileSystemTest : public AzureFileSystemTest {
+ Result<AzureOptions> MakeOptions() override {
+ AzureOptions options;
+ if (char* account_name =
std::getenv("AZURE_HIERARCHICAL_NAMESPACE_ACCOUNT_NAME")) {
+ char* account_key =
std::getenv("AZURE_HIERARCHICAL_NAMESPACE_ACCOUNT_KEY");
+ EXPECT_THAT(account_key, NotNull());
+ ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(account_name,
account_key));
+ return options;
+ }
+ return Status::Cancelled(
+ "Connection details not provided for a real hierachical namespace "
+ "account.");
+ }
+};
+
+TEST_F(AzureFlatNamespaceFileSystemTest, DetectHierarchicalNamespace) {
+ auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
+ ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_));
+ ASSERT_OK_AND_EQ(false,
hierarchical_namespace.Enabled(PreexistingContainerName()));
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest, DetectHierarchicalNamespace) {
+ auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
+ ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_));
+ ASSERT_OK_AND_EQ(true,
hierarchical_namespace.Enabled(PreexistingContainerName()));
+}
+
+TEST_F(AzuriteFileSystemTest, DetectHierarchicalNamespace) {
+ auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
+ ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_));
+ ASSERT_OK_AND_EQ(false,
hierarchical_namespace.Enabled(PreexistingContainerName()));
+}
+
+TEST_F(AzuriteFileSystemTest,
DetectHierarchicalNamespaceFailsWithMissingContainer) {
+ auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
+ ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_));
+ ASSERT_NOT_OK(hierarchical_namespace.Enabled("non-existent-container"));
+}
+
+TEST_F(AzuriteFileSystemTest, GetFileInfoAccount) {
+ arrow::fs::AssertFileInfo(fs_.get(), "", FileType::Directory);
+
+ // URI
+ ASSERT_RAISES(Invalid, fs_->GetFileInfo("abfs://"));
+}
+
+TEST_F(AzuriteFileSystemTest, GetFileInfoContainer) {
+ arrow::fs::AssertFileInfo(fs_.get(), PreexistingContainerName(),
FileType::Directory);
+
+ arrow::fs::AssertFileInfo(fs_.get(), "non-existent-container",
FileType::NotFound);
+
+ // URI
+ ASSERT_RAISES(Invalid, fs_->GetFileInfo("abfs://" +
PreexistingContainerName()));
+}
+
+TEST_F(AzuriteFileSystemTest, GetFileInfoObjectWithNestedStructure) {
+ // Adds detailed tests to handle cases of different edge cases
+ // with directory naming conventions (e.g. with and without slashes).
+ constexpr auto kObjectName =
"test-object-dir/some_other_dir/another_dir/foo";
+ // TODO(GH-38333): Switch to using Azure filesystem to write once its
implemented.
+ blob_service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(kObjectName)
+ .UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
strlen(kLoremIpsum));
+
+ // 0 is immediately after "/" lexicographically, ensure that this doesn't
+ // cause unexpected issues.
+ // TODO(GH-38333): Switch to using Azure filesystem to write once its
implemented.
+ blob_service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient("test-object-dir/some_other_dir0")
+ .UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
strlen(kLoremIpsum));
+
+ blob_service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(std::string(kObjectName) + "0")
+ .UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
strlen(kLoremIpsum));
+
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName,
FileType::File);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName + "/",
+ FileType::NotFound);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-dir",
+ FileType::Directory);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-dir/",
+ FileType::Directory);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() +
"test-object-dir/some_other_dir",
+ FileType::Directory);
+ AssertFileInfo(fs_.get(),
+ PreexistingContainerPath() +
"test-object-dir/some_other_dir/",
+ FileType::Directory);
+
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-di",
+ FileType::NotFound);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() +
"test-object-dir/some_other_di",
+ FileType::NotFound);
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest,
GetFileInfoObjectWithNestedStructure) {
+ // Adds detailed tests to handle cases of different edge cases
+ // with directory naming conventions (e.g. with and without slashes).
+ constexpr auto kObjectName =
"test-object-dir/some_other_dir/another_dir/foo";
+ // TODO(GH-38333): Switch to using Azure filesystem to write once its
implemented.
+ blob_service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(kObjectName)
+ .UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
strlen(kLoremIpsum));
+
+ // 0 is immediately after "/" lexicographically, ensure that this doesn't
+ // cause unexpected issues.
+ // TODO(GH-38333): Switch to using Azure filesystem to write once its
implemented.
+ blob_service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient("test-object-dir/some_other_dir0")
+ .UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
strlen(kLoremIpsum));
+
+ blob_service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(std::string(kObjectName) + "0")
+ .UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
strlen(kLoremIpsum));
+
+ datalake_service_client_->GetFileSystemClient(PreexistingContainerName())
+ .GetDirectoryClient("test-empty-object-dir")
+ .Create();
+
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName,
FileType::File);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName + "/",
+ FileType::NotFound);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-dir",
+ FileType::Directory);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-dir/",
+ FileType::Directory);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() +
"test-object-dir/some_other_dir",
+ FileType::Directory);
+ AssertFileInfo(fs_.get(),
+ PreexistingContainerPath() +
"test-object-dir/some_other_dir/",
+ FileType::Directory);
+
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-di",
+ FileType::NotFound);
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() +
"test-object-dir/some_other_di",
+ FileType::NotFound);
+
+ AssertFileInfo(fs_.get(), PreexistingContainerPath() +
"test-empty-object-dir",
+ FileType::Directory);
Review Comment:
We can do it by adding internal `ListBlobs` call counter and exporting it
only for testing.
Or we may be able to provide `AzureFileSystem::GetStatistics()` and the
return value provides statistics including the number of `ListBlobs` calles.
(I think that we don't need test it. If we want to test it, we can open a
new issue for it and defer it as a separated task to merge this as soon as
possible.)
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -453,27 +457,137 @@ class ObjectInputFile final : public
io::RandomAccessFile {
class AzureFileSystem::Impl {
public:
io::IOContext io_context_;
- std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
+ std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeServiceClient>
+ datalake_service_client_;
+ std::unique_ptr<Azure::Storage::Blobs::BlobServiceClient>
blob_service_client_;
AzureOptions options_;
+ internal::HierarchicalNamespaceDetector hierarchical_namespace_;
explicit Impl(AzureOptions options, io::IOContext io_context)
: io_context_(io_context), options_(std::move(options)) {}
Status Init() {
- service_client_ =
std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
+ blob_service_client_ =
std::make_unique<Azure::Storage::Blobs::BlobServiceClient>(
options_.account_blob_url, options_.storage_credentials_provider);
+ datalake_service_client_ =
+
std::make_shared<Azure::Storage::Files::DataLake::DataLakeServiceClient>(
+ options_.account_dfs_url, options_.storage_credentials_provider);
+ RETURN_NOT_OK(hierarchical_namespace_.Init(datalake_service_client_));
return Status::OK();
}
const AzureOptions& options() const { return options_; }
+ public:
+ Result<FileInfo> GetFileInfo(const AzurePath& path) {
+ FileInfo info;
+ info.set_path(path.full_path);
+
+ if (path.container.empty()) {
+ DCHECK(path.path_to_file.empty()); // The path is invalid if the
container is empty
+ // but not path_to_file.
+ // path must refer to the root of the Azure storage account. This is a
directory,
+ // and there isn't any extra metadata to fetch.
+ info.set_type(FileType::Directory);
+ return info;
+ }
+ if (path.path_to_file.empty()) {
+ // path refers to a container. This is a directory if it exists.
+ auto container_client =
+ blob_service_client_->GetBlobContainerClient(path.container);
+ try {
+ auto properties = container_client.GetProperties();
+ info.set_type(FileType::Directory);
+ info.set_mtime(
+
std::chrono::system_clock::time_point(properties.Value.LastModified));
+ return info;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (ContainerOrBlobNotFound(exception)) {
+ info.set_type(FileType::NotFound);
+ return info;
+ }
+ return internal::ExceptionToStatus(
+ "GetProperties for '" + container_client.GetUrl() +
+ "' failed with an unexpected Azure error. GetFileInfo is
unable to "
+ "determine whether the container exists.",
+ exception);
+ }
+ }
+ auto file_client =
datalake_service_client_->GetFileSystemClient(path.container)
+ .GetFileClient(path.path_to_file);
+ try {
+ auto properties = file_client.GetProperties();
+ if (properties.Value.IsDirectory) {
+ info.set_type(FileType::Directory);
+ } else if (internal::HasTrailingSlash(path.path_to_file)) {
+ // For a path with a trailing slash a hierarchical namespace may
return a blob
+ // with that trailing slash removed. For consistency with flat
namespace and
+ // other filesystems we chose to return NotFound.
+ info.set_type(FileType::NotFound);
+ return info;
+ } else {
+ info.set_type(FileType::File);
+ info.set_size(properties.Value.FileSize);
+ }
+ info.set_mtime(
+
std::chrono::system_clock::time_point(properties.Value.LastModified));
+ return info;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (ContainerOrBlobNotFound(exception)) {
+ ARROW_ASSIGN_OR_RAISE(bool hierarchical_namespace_enabled,
+ hierarchical_namespace_.Enabled(path.container));
+ if (hierarchical_namespace_enabled) {
+ // If the hierarchical namespace is enabled, then the storage
account will have
+ // explicit directories. Neither a file nor a directory was found.
+ info.set_type(FileType::NotFound);
+ return info;
+ }
+ // On flat namespace accounts there are no real directories.
Directories are only
+ // implied by using `/` in the blob name.
+ Azure::Storage::Blobs::ListBlobsOptions list_blob_options;
+
+ // If listing the prefix `path.path_to_file` with trailing slash
returns at least
+ // one result then `path` refers to an implied directory.
+ auto prefix = internal::EnsureTrailingSlash(path.path_to_file);
+ list_blob_options.Prefix = prefix;
+ // We only need to know if there is at least one result, so minimise
page size
+ // for efficiency.
+ list_blob_options.PageSizeHint = 1;
+
+ try {
+ auto paged_list_result =
+ blob_service_client_->GetBlobContainerClient(path.container)
+ .ListBlobs(list_blob_options);
+ if (paged_list_result.Blobs.size() > 0) {
+ info.set_type(FileType::Directory);
+ return info;
+ } else {
+ info.set_type(FileType::NotFound);
+ return info;
+ }
Review Comment:
How about simplify this?
```suggestion
} else {
info.set_type(FileType::NotFound);
}
return info;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]