This is an automated email from the ASF dual-hosted git repository.

felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 726568936e GH-39297: [C++][FS]: Inform caller of container 
not-existing when checking for HNS support (#39298)
726568936e is described below

commit 726568936e345ee2a15d3a5ad5d654e14939d673
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Wed Dec 20 10:42:46 2023 -0300

    GH-39297: [C++][FS]: Inform caller of container not-existing when checking 
for HNS support (#39298)
    
    ### Rationale for this change
    
    An operation checking for Hierarchical Namespace support shouldn't fail 
completely when the reason for the check failing is the container not existing. 
We can allow the caller to decide what to do in that situation by returning a 
result that indicates the check didn't succeed because the container doesn't 
exist.
    
    ### What changes are included in this PR?
    
     - Removal of the `azurefs_intern.h/cc` files
     - Implementation of the check as a free-function instead of a class
     - Memoization of the result in the `AzureFileSystem` class
    
    ### Are these changes tested?
    
    Yes. The tests were improved to cover all cases.
    * Closes: #39297
    
    Authored-by: Felipe Oliveira Carvalho <[email protected]>
    Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
---
 cpp/src/arrow/CMakeLists.txt                 |   4 +-
 cpp/src/arrow/filesystem/azurefs.cc          | 320 +++++++++++++++++----------
 cpp/src/arrow/filesystem/azurefs.h           |  51 ++++-
 cpp/src/arrow/filesystem/azurefs_internal.cc |  94 --------
 cpp/src/arrow/filesystem/azurefs_internal.h  |  39 ----
 cpp/src/arrow/filesystem/azurefs_test.cc     |  64 ++++--
 6 files changed, 309 insertions(+), 263 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 00947c6275..c1fafeebc0 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -506,8 +506,8 @@ if(ARROW_FILESYSTEM)
        filesystem/util_internal.cc)
 
   if(ARROW_AZURE)
-    list(APPEND ARROW_SRCS filesystem/azurefs.cc 
filesystem/azurefs_internal.cc)
-    set_source_files_properties(filesystem/azurefs.cc 
filesystem/azurefs_internal.cc
+    list(APPEND ARROW_SRCS filesystem/azurefs.cc)
+    set_source_files_properties(filesystem/azurefs.cc
                                 PROPERTIES SKIP_PRECOMPILE_HEADERS ON
                                            SKIP_UNITY_BUILD_INCLUSION ON)
   endif()
diff --git a/cpp/src/arrow/filesystem/azurefs.cc 
b/cpp/src/arrow/filesystem/azurefs.cc
index 1aa3e86a6f..032cd034e7 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -16,7 +16,6 @@
 // under the License.
 
 #include "arrow/filesystem/azurefs.h"
-#include "arrow/filesystem/azurefs_internal.h"
 
 #include <azure/identity.hpp>
 #include <azure/storage/blobs.hpp>
@@ -42,6 +41,8 @@ namespace DataLake = Azure::Storage::Files::DataLake;
 namespace Http = Azure::Core::Http;
 namespace Storage = Azure::Storage;
 
+using internal::HNSSupport;
+
 // -----------------------------------------------------------------------
 // AzureOptions Implementation
 
@@ -263,9 +264,11 @@ Status StatusFromErrorResponse(const std::string& url,
                          "): ", body_text);
 }
 
-bool IsContainerNotFound(const Storage::StorageException& exception) {
-  if (exception.ErrorCode == "ContainerNotFound") {
-    DCHECK_EQ(exception.StatusCode, Http::HttpStatusCode::NotFound);
+bool IsContainerNotFound(const Storage::StorageException& e) {
+  if (e.ErrorCode == "ContainerNotFound" ||
+      e.ReasonPhrase == "The specified container does not exist." ||
+      e.ReasonPhrase == "The specified filesystem does not exist.") {
+    DCHECK_EQ(e.StatusCode, Http::HttpStatusCode::NotFound);
     return true;
   }
   return false;
@@ -441,8 +444,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
       }
       return ExceptionToStatus(
           "GetProperties failed for '" + blob_client_->GetUrl() +
-              "' with an unexpected Azure error. Cannot initialise an 
ObjectInputFile "
-              "without knowing the file size.",
+              "'. Cannot initialise an ObjectInputFile without knowing the 
file size.",
           exception);
     }
   }
@@ -520,12 +522,11 @@ class ObjectInputFile final : public io::RandomAccessFile 
{
           ->DownloadTo(reinterpret_cast<uint8_t*>(out), nbytes, 
download_options)
           .Value.ContentRange.Length.Value();
     } catch (const Storage::StorageException& exception) {
-      return ExceptionToStatus("DownloadTo from '" + blob_client_->GetUrl() +
-                                   "' at position " + std::to_string(position) 
+ " for " +
-                                   std::to_string(nbytes) +
-                                   " bytes failed with an Azure error. ReadAt "
-                                   "failed to read the required byte range.",
-                               exception);
+      return ExceptionToStatus(
+          "DownloadTo from '" + blob_client_->GetUrl() + "' at position " +
+              std::to_string(position) + " for " + std::to_string(nbytes) +
+              " bytes failed. ReadAt failed to read the required byte range.",
+          exception);
     }
   }
 
@@ -576,9 +577,8 @@ Status 
CreateEmptyBlockBlob(std::shared_ptr<Blobs::BlockBlobClient> block_blob_c
   } catch (const Storage::StorageException& exception) {
     return ExceptionToStatus(
         "UploadFrom failed for '" + block_blob_client->GetUrl() +
-            "' with an unexpected Azure error. There is no existing blob at 
this "
-            "location or the existing blob must be replaced so 
ObjectAppendStream must "
-            "create a new empty block blob.",
+            "'. There is no existing blob at this location or the existing 
blob must be "
+            "replaced so ObjectAppendStream must create a new empty block 
blob.",
         exception);
   }
   return Status::OK();
@@ -591,8 +591,7 @@ Result<Blobs::Models::GetBlockListResult> GetBlockList(
   } catch (Storage::StorageException& exception) {
     return ExceptionToStatus(
         "GetBlockList failed for '" + block_blob_client->GetUrl() +
-            "' with an unexpected Azure error. Cannot write to a file without 
first "
-            "fetching the existing block list.",
+            "'. Cannot write to a file without first fetching the existing 
block list.",
         exception);
   }
 }
@@ -620,8 +619,7 @@ Status 
CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
   } catch (const Storage::StorageException& exception) {
     return ExceptionToStatus(
         "CommitBlockList failed for '" + block_blob_client->GetUrl() +
-            "' with an unexpected Azure error. Committing is required to flush 
an "
-            "output/append stream.",
+            "'. Committing is required to flush an output/append stream.",
         exception);
   }
   return Status::OK();
@@ -665,9 +663,8 @@ class ObjectAppendStream final : public io::OutputStream {
         } else {
           return ExceptionToStatus(
               "GetProperties failed for '" + block_blob_client_->GetUrl() +
-                  "' with an unexpected Azure error. Cannot initialise an "
-                  "ObjectAppendStream without knowing whether a file already 
exists at "
-                  "this path, and if it exists, its size.",
+                  "'. Cannot initialise an ObjectAppendStream without knowing 
whether a "
+                  "file already exists at this path, and if it exists, its 
size.",
               exception);
         }
         content_length_ = 0;
@@ -765,8 +762,7 @@ class ObjectAppendStream final : public io::OutputStream {
       return ExceptionToStatus(
           "StageBlock failed for '" + block_blob_client_->GetUrl() + "' 
new_block_id: '" +
               new_block_id +
-              "' with an unexpected Azure error. Staging new blocks is 
fundamental to "
-              "streaming writes to blob storage.",
+              "'. Staging new blocks is fundamental to streaming writes to 
blob storage.",
           exception);
     }
     block_ids_.push_back(new_block_id);
@@ -786,11 +782,116 @@ class ObjectAppendStream final : public io::OutputStream 
{
   Storage::Metadata metadata_;
 };
 
+bool IsDfsEmulator(const AzureOptions& options) {
+  return options.dfs_storage_authority != ".dfs.core.windows.net";
+}
+
 }  // namespace
 
+// -----------------------------------------------------------------------
+// internal implementation
+
+namespace internal {
+
+Result<HNSSupport> CheckIfHierarchicalNamespaceIsEnabled(
+    DataLake::DataLakeFileSystemClient& adlfs_client, const AzureOptions& 
options) {
+  try {
+    auto directory_client = adlfs_client.GetDirectoryClient("");
+    // GetAccessControlList will fail on storage accounts
+    // without hierarchical namespace enabled.
+    directory_client.GetAccessControlList();
+    return HNSSupport::kEnabled;
+  } catch (std::out_of_range& exception) {
+    // Azurite issue detected.
+    DCHECK(IsDfsEmulator(options));
+    return HNSSupport::kDisabled;
+  } catch (const Storage::StorageException& exception) {
+    // Flat namespace storage accounts with "soft delete" enabled return
+    //
+    //   "Conflict - This endpoint does not support BlobStorageEvents
+    //   or SoftDelete. [...]" [1],
+    //
+    // otherwise it returns:
+    //
+    //   "BadRequest - This operation is only supported on a hierarchical 
namespace
+    //   account."
+    //
+    // [1]:
+    // 
https://learn.microsoft.com/en-us/answers/questions/1069779/this-endpoint-does-not-support-blobstorageevents-o
+    switch (exception.StatusCode) {
+      case Http::HttpStatusCode::BadRequest:
+      case Http::HttpStatusCode::Conflict:
+        return HNSSupport::kDisabled;
+      case Http::HttpStatusCode::NotFound:
+        if (IsDfsEmulator(options)) {
+          return HNSSupport::kDisabled;
+        }
+        // Did we get an error because of the container not existing?
+        if (IsContainerNotFound(exception)) {
+          return HNSSupport::kContainerNotFound;
+        }
+        [[fallthrough]];
+      default:
+        if (exception.ErrorCode == "HierarchicalNamespaceNotEnabled") {
+          return HNSSupport::kDisabled;
+        }
+        return ExceptionToStatus("Check for Hierarchical Namespace support on 
'" +
+                                     adlfs_client.GetUrl() + "' failed.",
+                                 exception);
+    }
+  }
+}
+
+}  // namespace internal
+
 // -----------------------------------------------------------------------
 // AzureFilesystem Implementation
 
+namespace {
+
+// In Azure Storage terminology, a "container" and a "filesystem" are the same
+// kind of object, but it can be accessed using different APIs. The Blob 
Storage
+// API calls it a "container", the Data Lake Storage Gen 2 API calls it a
+// "filesystem". Creating a container using the Blob Storage API will make it
+// accessible using the Data Lake Storage Gen 2 API and vice versa.
+
+template <class ContainerClient>
+Result<FileInfo> GetContainerPropsAsFileInfo(const std::string& container_name,
+                                             ContainerClient& 
container_client) {
+  FileInfo info{container_name};
+  try {
+    auto properties = container_client.GetProperties();
+    info.set_type(FileType::Directory);
+    
info.set_mtime(std::chrono::system_clock::time_point{properties.Value.LastModified});
+    return info;
+  } catch (const Storage::StorageException& exception) {
+    if (IsContainerNotFound(exception)) {
+      info.set_type(FileType::NotFound);
+      return info;
+    }
+    return ExceptionToStatus(
+        "GetProperties for '" + container_client.GetUrl() + "' failed.", 
exception);
+  }
+}
+
+FileInfo DirectoryFileInfoFromPath(std::string_view path) {
+  return FileInfo{std::string{internal::RemoveTrailingSlash(path)}, 
FileType::Directory};
+}
+
+FileInfo FileInfoFromBlob(std::string_view container,
+                          const Blobs::Models::BlobItem& blob) {
+  auto path = internal::ConcatAbstractPath(container, blob.Name);
+  if (internal::HasTrailingSlash(blob.Name)) {
+    return DirectoryFileInfoFromPath(path);
+  }
+  FileInfo info{std::move(path), FileType::File};
+  info.set_size(blob.BlobSize);
+  
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
+  return info;
+}
+
+}  // namespace
+
 class AzureFileSystem::Impl {
  private:
   io::IOContext io_context_;
@@ -798,7 +899,7 @@ class AzureFileSystem::Impl {
 
   std::unique_ptr<DataLake::DataLakeServiceClient> datalake_service_client_;
   std::unique_ptr<Blobs::BlobServiceClient> blob_service_client_;
-  internal::HierarchicalNamespaceDetector hns_detector_;
+  HNSSupport cached_hns_support_ = HNSSupport::kUnknown;
 
   Impl(AzureOptions options, io::IOContext io_context)
       : io_context_(std::move(io_context)), options_(std::move(options)) {}
@@ -812,52 +913,54 @@ class AzureFileSystem::Impl {
                           self->options_.MakeBlobServiceClient());
     ARROW_ASSIGN_OR_RAISE(self->datalake_service_client_,
                           self->options_.MakeDataLakeServiceClient());
-    
RETURN_NOT_OK(self->hns_detector_.Init(self->datalake_service_client_.get()));
     return self;
   }
 
   io::IOContext& io_context() { return io_context_; }
   const AzureOptions& options() const { return options_; }
 
+ private:
+  /// \brief Memoized version of CheckIfHierarchicalNamespaceIsEnabled.
+  ///
+  /// \return kEnabled/kDisabled/kContainerNotFound (kUnknown is never 
returned).
+  Result<HNSSupport> HierarchicalNamespaceSupport(
+      DataLake::DataLakeFileSystemClient& adlfs_client) {
+    switch (cached_hns_support_) {
+      case HNSSupport::kEnabled:
+      case HNSSupport::kDisabled:
+        return cached_hns_support_;
+      case HNSSupport::kUnknown:
+      case HNSSupport::kContainerNotFound:
+        // Try the check again because the support is still unknown or the 
container
+        // that didn't exist before may exist now.
+        break;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        cached_hns_support_,
+        internal::CheckIfHierarchicalNamespaceIsEnabled(adlfs_client, 
options_));
+    DCHECK_NE(cached_hns_support_, HNSSupport::kUnknown);
+    // Caller should handle kContainerNotFound case appropriately.
+    return cached_hns_support_;
+  }
+
  public:
   Result<FileInfo> GetFileInfo(const AzureLocation& location) {
-    FileInfo info;
-    info.set_path(location.all);
-
     if (location.container.empty()) {
-      // The location is invalid if the container is empty but the path is not.
       DCHECK(location.path.empty());
-      // This location must be derived from the root path. FileInfo should 
describe it
-      // as a directory and there isn't any extra metadata to fetch.
-      info.set_type(FileType::Directory);
-      return info;
+      // Root directory of the storage account.
+      return FileInfo{"", FileType::Directory};
     }
     if (location.path.empty()) {
-      // The location refers to a container. This is a directory if it exists.
+      // We have a container, but no path within the container.
+      // The container itself represents a directory.
       auto container_client =
           blob_service_client_->GetBlobContainerClient(location.container);
-      try {
-        auto properties = container_client.GetProperties();
-        info.set_type(FileType::Directory);
-        info.set_mtime(
-            
std::chrono::system_clock::time_point{properties.Value.LastModified});
-        return info;
-      } catch (const Storage::StorageException& exception) {
-        if (IsContainerNotFound(exception)) {
-          info.set_type(FileType::NotFound);
-          return info;
-        }
-        return ExceptionToStatus(
-            "GetProperties for '" + container_client.GetUrl() +
-                "' failed with an unexpected Azure error. GetFileInfo is 
unable to "
-                "determine whether the container exists.",
-            exception);
-      }
+      return GetContainerPropsAsFileInfo(location.container, container_client);
     }
-
     // There is a path to search within the container.
-    auto file_client = 
datalake_service_client_->GetFileSystemClient(location.container)
-                           .GetFileClient(location.path);
+    FileInfo info{location.all};
+    auto adlfs_client = 
datalake_service_client_->GetFileSystemClient(location.container);
+    auto file_client = adlfs_client.GetFileClient(location.path);
     try {
       auto properties = file_client.GetProperties();
       if (properties.Value.IsDirectory) {
@@ -879,11 +982,12 @@ class AzureFileSystem::Impl {
       return info;
     } catch (const Storage::StorageException& exception) {
       if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
-        ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
-                              hns_detector_.Enabled(location.container));
-        if (hierarchical_namespace_enabled) {
-          // If the hierarchical namespace is enabled, then the storage 
account will have
-          // explicit directories. Neither a file nor a directory was found.
+        ARROW_ASSIGN_OR_RAISE(auto hns_support,
+                              HierarchicalNamespaceSupport(adlfs_client));
+        if (hns_support == HNSSupport::kContainerNotFound ||
+            hns_support == HNSSupport::kEnabled) {
+          // If the hierarchical namespace is enabled, then the storage 
account will
+          // have explicit directories. Neither a file nor a directory was 
found.
           info.set_type(FileType::NotFound);
           return info;
         }
@@ -907,16 +1011,15 @@ class AzureFileSystem::Impl {
           return info;
         } catch (const Storage::StorageException& exception) {
           return ExceptionToStatus(
-              "ListBlobs for '" + *list_blob_options.Prefix +
-                  "' failed with an unexpected Azure error. GetFileInfo is 
unable to "
-                  "determine whether the path should be considered an implied 
directory.",
+              "ListBlobs failed for prefix='" + *list_blob_options.Prefix +
+                  "' failed. GetFileInfo is unable to determine whether the 
path should "
+                  "be considered an implied directory.",
               exception);
         }
       }
       return ExceptionToStatus(
-          "GetProperties for '" + file_client.GetUrl() +
-              "' failed with an unexpected "
-              "Azure error. GetFileInfo is unable to determine whether the 
path exists.",
+          "GetProperties failed for '" + file_client.GetUrl() +
+              "' GetFileInfo is unable to determine whether the path exists.",
           exception);
     }
   }
@@ -940,23 +1043,6 @@ class AzureFileSystem::Impl {
     return Status::OK();
   }
 
-  static FileInfo FileInfoFromBlob(std::string_view container,
-                                   const Blobs::Models::BlobItem& blob) {
-    auto path = internal::ConcatAbstractPath(container, blob.Name);
-    if (internal::HasTrailingSlash(blob.Name)) {
-      return DirectoryFileInfoFromPath(path);
-    }
-    FileInfo info{std::move(path), FileType::File};
-    info.set_size(blob.BlobSize);
-    
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
-    return info;
-  }
-
-  static FileInfo DirectoryFileInfoFromPath(std::string_view path) {
-    return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
-                    FileType::Directory};
-  }
-
   static std::string_view BasenameView(std::string_view s) {
     DCHECK(!internal::HasTrailingSlash(s));
     auto offset = s.find_last_of(internal::kSep);
@@ -1158,9 +1244,9 @@ class AzureFileSystem::Impl {
       return Status::Invalid("Cannot create an empty container");
     }
 
+    auto container_client =
+        blob_service_client_->GetBlobContainerClient(location.container);
     if (location.path.empty()) {
-      auto container_client =
-          blob_service_client_->GetBlobContainerClient(location.container);
       try {
         auto response = container_client.Create();
         if (response.Value.Created) {
@@ -1177,18 +1263,25 @@ class AzureFileSystem::Impl {
       }
     }
 
-    ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
-                          hns_detector_.Enabled(location.container));
-    if (!hierarchical_namespace_enabled) {
+    auto adlfs_client = 
datalake_service_client_->GetFileSystemClient(location.container);
+    ARROW_ASSIGN_OR_RAISE(auto hns_support, 
HierarchicalNamespaceSupport(adlfs_client));
+    if (hns_support == HNSSupport::kContainerNotFound) {
+      return PathNotFound(location);
+    }
+    if (hns_support == HNSSupport::kDisabled) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto container_info,
+          GetContainerPropsAsFileInfo(location.container, container_client));
+      if (container_info.type() == FileType::NotFound) {
+        return PathNotFound(location);
+      }
       // Without hierarchical namespace enabled Azure blob storage has no 
directories.
       // Therefore we can't, and don't need to create one. Simply creating a 
blob with `/`
       // in the name implies directories.
       return Status::OK();
     }
 
-    auto directory_client =
-        datalake_service_client_->GetFileSystemClient(location.container)
-            .GetDirectoryClient(location.path);
+    auto directory_client = adlfs_client.GetDirectoryClient(location.path);
     try {
       auto response = directory_client.Create();
       if (response.Value.Created) {
@@ -1219,19 +1312,19 @@ class AzureFileSystem::Impl {
                                exception);
     }
 
-    ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
-                          hns_detector_.Enabled(location.container));
-    if (!hierarchical_namespace_enabled) {
+    auto adlfs_client = 
datalake_service_client_->GetFileSystemClient(location.container);
+    ARROW_ASSIGN_OR_RAISE(auto hns_support, 
HierarchicalNamespaceSupport(adlfs_client));
+    if (hns_support == HNSSupport::kDisabled) {
       // Without hierarchical namespace enabled Azure blob storage has no 
directories.
       // Therefore we can't, and don't need to create one. Simply creating a 
blob with `/`
       // in the name implies directories.
       return Status::OK();
     }
+    // Don't handle HNSSupport::kContainerNotFound, just assume it still 
exists (because
+    // it was created above) and try to create the directory.
 
     if (!location.path.empty()) {
-      auto directory_client =
-          datalake_service_client_->GetFileSystemClient(location.container)
-              .GetDirectoryClient(location.path);
+      auto directory_client = adlfs_client.GetDirectoryClient(location.path);
       try {
         directory_client.CreateIfNotExists();
       } catch (const Storage::StorageException& exception) {
@@ -1344,6 +1437,12 @@ class AzureFileSystem::Impl {
       return Status::Invalid("Cannot delete an empty container");
     }
 
+    auto adlfs_client = 
datalake_service_client_->GetFileSystemClient(location.container);
+    ARROW_ASSIGN_OR_RAISE(auto hns_support, 
HierarchicalNamespaceSupport(adlfs_client));
+    if (hns_support == HNSSupport::kContainerNotFound) {
+      return PathNotFound(location);
+    }
+
     if (location.path.empty()) {
       auto container_client =
           blob_service_client_->GetBlobContainerClient(location.container);
@@ -1363,12 +1462,8 @@ class AzureFileSystem::Impl {
       }
     }
 
-    ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
-                          hns_detector_.Enabled(location.container));
-    if (hierarchical_namespace_enabled) {
-      auto directory_client =
-          datalake_service_client_->GetFileSystemClient(location.container)
-              .GetDirectoryClient(location.path);
+    if (hns_support == HNSSupport::kEnabled) {
+      auto directory_client = adlfs_client.GetDirectoryClient(location.path);
       try {
         auto response = directory_client.DeleteRecursive();
         if (response.Value.Deleted) {
@@ -1394,19 +1489,20 @@ class AzureFileSystem::Impl {
       return internal::InvalidDeleteDirContents(location.all);
     }
 
-    ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
-                          hns_detector_.Enabled(location.container));
-    if (hierarchical_namespace_enabled) {
-      auto file_system_client =
-          datalake_service_client_->GetFileSystemClient(location.container);
-      auto directory_client = 
file_system_client.GetDirectoryClient(location.path);
+    auto adlfs_client = 
datalake_service_client_->GetFileSystemClient(location.container);
+    ARROW_ASSIGN_OR_RAISE(auto hns_support, 
HierarchicalNamespaceSupport(adlfs_client));
+    if (hns_support == HNSSupport::kContainerNotFound) {
+      return missing_dir_ok ? Status::OK() : PathNotFound(location);
+    }
+
+    if (hns_support == HNSSupport::kEnabled) {
+      auto directory_client = adlfs_client.GetDirectoryClient(location.path);
       try {
         auto list_response = directory_client.ListPaths(false);
         for (; list_response.HasPage(); list_response.MoveToNextPage()) {
           for (const auto& path : list_response.Paths) {
             if (path.IsDirectory) {
-              auto sub_directory_client =
-                  file_system_client.GetDirectoryClient(path.Name);
+              auto sub_directory_client = 
adlfs_client.GetDirectoryClient(path.Name);
               try {
                 sub_directory_client.DeleteRecursive();
               } catch (const Storage::StorageException& exception) {
@@ -1416,7 +1512,7 @@ class AzureFileSystem::Impl {
                     exception);
               }
             } else {
-              auto sub_file_client = 
file_system_client.GetFileClient(path.Name);
+              auto sub_file_client = adlfs_client.GetFileClient(path.Name);
               try {
                 sub_file_client.Delete();
               } catch (const Storage::StorageException& exception) {
diff --git a/cpp/src/arrow/filesystem/azurefs.h 
b/cpp/src/arrow/filesystem/azurefs.h
index 35c140b109..b7ef2bb313 100644
--- a/cpp/src/arrow/filesystem/azurefs.h
+++ b/cpp/src/arrow/filesystem/azurefs.h
@@ -38,8 +38,9 @@ class BlobServiceClient;
 }
 
 namespace Azure::Storage::Files::DataLake {
+class DataLakeFileSystemClient;
 class DataLakeServiceClient;
-}
+}  // namespace Azure::Storage::Files::DataLake
 
 namespace arrow::fs {
 
@@ -117,6 +118,54 @@ struct ARROW_EXPORT AzureOptions {
   MakeDataLakeServiceClient() const;
 };
 
+namespace internal {
+
+enum class HNSSupport {
+  kUnknown = 0,
+  kContainerNotFound = 1,
+  kDisabled = 2,
+  kEnabled = 3,
+};
+
+/// \brief Performs a request to check if the storage account has Hierarchical
+/// Namespace support enabled.
+///
+/// This check requires a DataLakeFileSystemClient for any container of the
+/// storage account. If the container doesn't exist yet, we just forward that
+/// error to the caller (kContainerNotFound) since that's a proper error to 
the operation
+/// on that container anyways -- no need to try again with or without the 
knowledge of
+/// Hierarchical Namespace support.
+///
+/// Hierarchical Namespace support can't easily be changed after the storage 
account is
+/// created and the feature is shared by all containers in the storage account.
+/// This means the result of this check can (and should!) be cached as soon as
+/// it returns a successful result on any container of the storage account (see
+/// AzureFileSystem::Impl).
+///
+/// The check consists of a call to 
DataLakeFileSystemClient::GetAccessControlList()
+/// on the root directory of the container. An approach taken by the Hadoop 
Azure
+/// project [1]. A more obvious approach would be to call
+/// BlobServiceClient::GetAccountInfo(), but that endpoint requires elevated
+/// permissions [2] that we can't generally rely on.
+///
+/// [1]:
+/// 
https://github.com/apache/hadoop/blob/7c6af6a5f626d18d68b656d085cc23e4c1f7a1ef/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java#L356.
+/// [2]:
+/// 
https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob-service-properties?tabs=azure-ad#authorization
+///
+/// IMPORTANT: If the result is kEnabled or kDisabled, it doesn't necessarily 
mean that
+/// the container exists.
+///
+/// \param adlfs_client A DataLakeFileSystemClient for a container of the 
storage
+/// account.
+/// \return kEnabled/kDisabled/kContainerNotFound (kUnknown is never
+/// returned).
+Result<HNSSupport> CheckIfHierarchicalNamespaceIsEnabled(
+    Azure::Storage::Files::DataLake::DataLakeFileSystemClient& adlfs_client,
+    const AzureOptions& options);
+
+}  // namespace internal
+
 /// \brief FileSystem implementation backed by Azure Blob Storage (ABS) [1] and
 /// Azure Data Lake Storage Gen2 (ADLS Gen2) [2].
 ///
diff --git a/cpp/src/arrow/filesystem/azurefs_internal.cc 
b/cpp/src/arrow/filesystem/azurefs_internal.cc
deleted file mode 100644
index 39c3fb23e3..0000000000
--- a/cpp/src/arrow/filesystem/azurefs_internal.cc
+++ /dev/null
@@ -1,94 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/filesystem/azurefs_internal.h"
-
-#include <azure/storage/files/datalake.hpp>
-
-#include "arrow/result.h"
-
-namespace arrow::fs::internal {
-
-namespace {
-
-// TODO(GH-38772): Remove azurefs_internal.h/.cc by moving the detector to
-// azurefs.cc (which contains a private copy of this helper function already).
-Status ExceptionToStatus(const std::string& prefix,
-                         const Azure::Storage::StorageException& exception) {
-  return Status::IOError(prefix, " Azure Error: ", exception.what());
-}
-
-}  // namespace
-
-Status HierarchicalNamespaceDetector::Init(
-    Azure::Storage::Files::DataLake::DataLakeServiceClient* 
datalake_service_client) {
-  datalake_service_client_ = datalake_service_client;
-  return Status::OK();
-}
-
-Result<bool> HierarchicalNamespaceDetector::Enabled(const std::string& 
container_name) {
-  // Hierarchical namespace can't easily be changed after the storage account 
is created
-  // and its common across all containers in the storage account. Do nothing 
until we've
-  // checked for a cached result.
-  if (enabled_.has_value()) {
-    return enabled_.value();
-  }
-
-  // This approach is inspired by hadoop-azure
-  // 
https://github.com/apache/hadoop/blob/7c6af6a5f626d18d68b656d085cc23e4c1f7a1ef/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java#L356.
-  // Unfortunately `blob_service_client->GetAccountInfo()` requires 
significantly
-  // elevated permissions.
-  // 
https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob-service-properties?tabs=azure-ad#authorization
-  auto filesystem_client = 
datalake_service_client_->GetFileSystemClient(container_name);
-  auto directory_client = filesystem_client.GetDirectoryClient("/");
-  try {
-    directory_client.GetAccessControlList();
-    enabled_ = true;
-  } catch (const Azure::Storage::StorageException& exception) {
-    // GetAccessControlList will fail on storage accounts without hierarchical
-    // namespace enabled.
-
-    if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::BadRequest 
||
-        exception.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) {
-      // Flat namespace storage accounts with soft delete enabled return
-      // Conflict - This endpoint does not support BlobStorageEvents or 
SoftDelete
-      // otherwise it returns: BadRequest - This operation is only supported 
on a
-      // hierarchical namespace account.
-      enabled_ = false;
-    } else if (exception.StatusCode == 
Azure::Core::Http::HttpStatusCode::NotFound) {
-      // Azurite returns NotFound.
-      try {
-        filesystem_client.GetProperties();
-        enabled_ = false;
-      } catch (const Azure::Storage::StorageException& exception) {
-        return ExceptionToStatus("Failed to confirm '" + 
filesystem_client.GetUrl() +
-                                     "' is an accessible container. Therefore 
the "
-                                     "hierarchical namespace check was 
invalid.",
-                                 exception);
-      }
-    } else {
-      return ExceptionToStatus(
-          "GetAccessControlList for '" + directory_client.GetUrl() +
-              "' failed with an unexpected Azure error, while checking "
-              "whether the storage account has hierarchical namespace 
enabled.",
-          exception);
-    }
-  }
-  return enabled_.value();
-}
-
-}  // namespace arrow::fs::internal
diff --git a/cpp/src/arrow/filesystem/azurefs_internal.h 
b/cpp/src/arrow/filesystem/azurefs_internal.h
deleted file mode 100644
index 92592cf164..0000000000
--- a/cpp/src/arrow/filesystem/azurefs_internal.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <optional>
-
-#include <azure/storage/files/datalake.hpp>
-
-#include "arrow/result.h"
-
-namespace arrow::fs::internal {
-
-class HierarchicalNamespaceDetector {
- public:
-  Status Init(
-      Azure::Storage::Files::DataLake::DataLakeServiceClient* 
datalake_service_client);
-  Result<bool> Enabled(const std::string& container_name);
-
- private:
-  Azure::Storage::Files::DataLake::DataLakeServiceClient* 
datalake_service_client_;
-  std::optional<bool> enabled_;
-};
-
-}  // namespace arrow::fs::internal
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc 
b/cpp/src/arrow/filesystem/azurefs_test.cc
index 8a39c4c554..db0e133e0d 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -34,7 +34,6 @@
 #include <boost/process.hpp>
 
 #include "arrow/filesystem/azurefs.h"
-#include "arrow/filesystem/azurefs_internal.h"
 
 #include <memory>
 #include <random>
@@ -520,7 +519,8 @@ class TestAzureFileSystem : public ::testing::Test {
 
   // Tests that are called from more than one implementation of 
TestAzureFileSystem
 
-  void TestDetectHierarchicalNamespace();
+  void TestDetectHierarchicalNamespace(bool trip_up_azurite);
+  void TestDetectHierarchicalNamespaceOnMissingContainer();
   void TestGetFileInfoObject();
   void TestGetFileInfoObjectWithNestedStructure();
 
@@ -610,14 +610,49 @@ class TestAzureFileSystem : public ::testing::Test {
   }
 };
 
-void TestAzureFileSystem::TestDetectHierarchicalNamespace() {
-  // Check the environments are implemented and injected here correctly.
-  auto expected = WithHierarchicalNamespace();
+void TestAzureFileSystem::TestDetectHierarchicalNamespace(bool 
trip_up_azurite) {
+  EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+  if (trip_up_azurite && env->backend() != AzureBackend::kAzurite) {
+    GTEST_SKIP() << "trip_up_azurite=true is only for Azurite.";
+  }
 
   auto data = SetUpPreexistingData();
-  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
-  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_OK_AND_EQ(expected, 
hierarchical_namespace.Enabled(data.container_name));
+  if (trip_up_azurite) {
+    // Azurite causes GetDirectoryClient("/") to throw a std::out_of_range
+    // exception when a "/" blob exists, so we exercise that code path.
+    auto container_client =
+        blob_service_client_->GetBlobContainerClient(data.container_name);
+    CreateBlob(container_client, "/");
+  }
+
+  auto adlfs_client = 
datalake_service_client_->GetFileSystemClient(data.container_name);
+  ASSERT_OK_AND_ASSIGN(auto hns_support, 
internal::CheckIfHierarchicalNamespaceIsEnabled(
+                                             adlfs_client, options_));
+  if (env->WithHierarchicalNamespace()) {
+    ASSERT_EQ(hns_support, internal::HNSSupport::kEnabled);
+  } else {
+    ASSERT_EQ(hns_support, internal::HNSSupport::kDisabled);
+  }
+}
+
+void TestAzureFileSystem::TestDetectHierarchicalNamespaceOnMissingContainer() {
+  auto container_name = PreexistingData::RandomContainerName(rng_);
+  auto adlfs_client = 
datalake_service_client_->GetFileSystemClient(container_name);
+  ASSERT_OK_AND_ASSIGN(auto hns_support, 
internal::CheckIfHierarchicalNamespaceIsEnabled(
+                                             adlfs_client, options_));
+  EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+  switch (env->backend()) {
+    case AzureBackend::kAzurite:
+      ASSERT_EQ(hns_support, internal::HNSSupport::kDisabled);
+      break;
+    case AzureBackend::kAzure:
+      if (env->WithHierarchicalNamespace()) {
+        ASSERT_EQ(hns_support, internal::HNSSupport::kContainerNotFound);
+      } else {
+        ASSERT_EQ(hns_support, internal::HNSSupport::kDisabled);
+      }
+      break;
+  }
 }
 
 void TestAzureFileSystem::TestGetFileInfoObject() {
@@ -733,7 +768,12 @@ using AllEnvironments =
 TYPED_TEST_SUITE(AzureFileSystemTestOnAllEnvs, AllEnvironments);
 
 TYPED_TEST(AzureFileSystemTestOnAllEnvs, DetectHierarchicalNamespace) {
-  this->TestDetectHierarchicalNamespace();
+  this->TestDetectHierarchicalNamespace(true);
+  this->TestDetectHierarchicalNamespace(false);
+}
+
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, 
DetectHierarchicalNamespaceOnMissingContainer) {
+  this->TestDetectHierarchicalNamespaceOnMissingContainer();
 }
 
 TYPED_TEST(AzureFileSystemTestOnAllEnvs, GetFileInfoObject) {
@@ -817,12 +857,6 @@ TEST_F(TestAzureHierarchicalNSFileSystem, 
DeleteDirContentsFailureNonexistent) {
 
 // Tests using Azurite (the local Azure emulator)
 
-TEST_F(TestAzuriteFileSystem, 
DetectHierarchicalNamespaceFailsWithMissingContainer) {
-  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
-  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_RAISES(IOError, 
hierarchical_namespace.Enabled("nonexistent-container"));
-}
-
 TEST_F(TestAzuriteFileSystem, GetFileInfoAccount) {
   AssertFileInfo(fs_.get(), "", FileType::Directory);
 


Reply via email to