pitrou commented on code in PR #39207:
URL: https://github.com/apache/arrow/pull/39207#discussion_r1425685047


##########
cpp/src/arrow/filesystem/azurefs.h:
##########
@@ -25,86 +25,104 @@
 #include "arrow/util/macros.h"
 #include "arrow/util/uri.h"
 
-namespace Azure {
-namespace Core {
-namespace Credentials {
-
+namespace Azure::Core::Credentials {
 class TokenCredential;
+}
 
-}  // namespace Credentials
-}  // namespace Core
-namespace Storage {
-
+namespace Azure::Storage {
 class StorageSharedKeyCredential;
+}
 
-}  // namespace Storage
-}  // namespace Azure
-
-namespace arrow {
-namespace fs {
-
-enum class AzureCredentialsKind : int8_t {
-  /// Anonymous access (no credentials used), public
-  Anonymous,
-  /// Use explicitly-provided access key pair
-  StorageCredentials,
-  /// Use ServicePrincipleCredentials
-  ServicePrincipleCredentials,
-  /// Use Sas Token to authenticate
-  Sas,
-  /// Use Connection String
-  ConnectionString
-};
+namespace Azure::Storage::Blobs {
+class BlobServiceClient;
+}
+
+namespace Azure::Storage::Files::DataLake {
+class DataLakeServiceClient;
+}
+
+namespace arrow::fs {
 
 enum class AzureBackend : bool {

Review Comment:
   Hmm... I don't really want to think about the consequences of using 
something else than a regular integer type for the enum base type. Why not 
simply `int`? We're not trying to pack bits here.



##########
cpp/src/arrow/filesystem/azurefs.h:
##########
@@ -25,86 +25,104 @@
 #include "arrow/util/macros.h"
 #include "arrow/util/uri.h"
 
-namespace Azure {
-namespace Core {
-namespace Credentials {
-
+namespace Azure::Core::Credentials {
 class TokenCredential;
+}
 
-}  // namespace Credentials
-}  // namespace Core
-namespace Storage {
-
+namespace Azure::Storage {
 class StorageSharedKeyCredential;
+}
 
-}  // namespace Storage
-}  // namespace Azure
-
-namespace arrow {
-namespace fs {
-
-enum class AzureCredentialsKind : int8_t {
-  /// Anonymous access (no credentials used), public
-  Anonymous,
-  /// Use explicitly-provided access key pair
-  StorageCredentials,
-  /// Use ServicePrincipleCredentials
-  ServicePrincipleCredentials,
-  /// Use Sas Token to authenticate
-  Sas,
-  /// Use Connection String
-  ConnectionString
-};
+namespace Azure::Storage::Blobs {
+class BlobServiceClient;
+}
+
+namespace Azure::Storage::Files::DataLake {
+class DataLakeServiceClient;
+}
+
+namespace arrow::fs {
 
 enum class AzureBackend : bool {
-  /// Official Azure Remote Backend
-  Azure,
-  /// Local Simulated Storage
-  Azurite
+  /// \brief Official Azure Remote Backend
+  kAzure,
+  /// \brief Local Simulated Storage
+  kAzurite
 };
 
 /// Options for the AzureFileSystem implementation.
 struct ARROW_EXPORT AzureOptions {
-  std::string account_dfs_url;
-  std::string account_blob_url;
-  AzureBackend backend = AzureBackend::Azure;
-  AzureCredentialsKind credentials_kind = AzureCredentialsKind::Anonymous;
+  /// \brief The backend to connect to: Azure or Azurite (for testing).
+  AzureBackend backend = AzureBackend::kAzure;

Review Comment:
   Do we plan to autodetect the backend at some point? We do that in S3 (the 
`DetectS3Backend` function).



##########
cpp/src/arrow/filesystem/azurefs.h:
##########
@@ -25,86 +25,104 @@
 #include "arrow/util/macros.h"
 #include "arrow/util/uri.h"
 
-namespace Azure {
-namespace Core {
-namespace Credentials {
-
+namespace Azure::Core::Credentials {
 class TokenCredential;
+}
 
-}  // namespace Credentials
-}  // namespace Core
-namespace Storage {
-
+namespace Azure::Storage {
 class StorageSharedKeyCredential;
+}
 
-}  // namespace Storage
-}  // namespace Azure
-
-namespace arrow {
-namespace fs {
-
-enum class AzureCredentialsKind : int8_t {
-  /// Anonymous access (no credentials used), public
-  Anonymous,
-  /// Use explicitly-provided access key pair
-  StorageCredentials,
-  /// Use ServicePrincipleCredentials
-  ServicePrincipleCredentials,
-  /// Use Sas Token to authenticate
-  Sas,
-  /// Use Connection String
-  ConnectionString
-};
+namespace Azure::Storage::Blobs {
+class BlobServiceClient;
+}
+
+namespace Azure::Storage::Files::DataLake {
+class DataLakeServiceClient;
+}
+
+namespace arrow::fs {
 
 enum class AzureBackend : bool {
-  /// Official Azure Remote Backend
-  Azure,
-  /// Local Simulated Storage
-  Azurite
+  /// \brief Official Azure Remote Backend
+  kAzure,
+  /// \brief Local Simulated Storage
+  kAzurite
 };
 
 /// Options for the AzureFileSystem implementation.
 struct ARROW_EXPORT AzureOptions {
-  std::string account_dfs_url;
-  std::string account_blob_url;
-  AzureBackend backend = AzureBackend::Azure;
-  AzureCredentialsKind credentials_kind = AzureCredentialsKind::Anonymous;
+  /// \brief The backend to connect to: Azure or Azurite (for testing).
+  AzureBackend backend = AzureBackend::kAzure;
 
-  std::string sas_token;
-  std::string connection_string;
-  std::shared_ptr<Azure::Storage::StorageSharedKeyCredential>
-      storage_credentials_provider;
-  std::shared_ptr<Azure::Core::Credentials::TokenCredential>
-      service_principle_credentials_provider;
+  // TODO(GH-38598): Add support for more auth methods.
+  // std::string connection_string;
+  // std::string sas_token;
 
   /// \brief Default metadata for OpenOutputStream.
   ///
   /// This will be ignored if non-empty metadata is passed to OpenOutputStream.
   std::shared_ptr<const KeyValueMetadata> default_metadata;
 
+ private:
+  std::string account_blob_url_;
+  std::string account_dfs_url_;
+
+  enum class CredentialKind {
+    kAnonymous,
+    kStorageSharedKeyCredential,
+  } credential_kind_ = CredentialKind::kAnonymous;
+
+  std::shared_ptr<Azure::Storage::StorageSharedKeyCredential>
+      storage_shared_key_credential_;
+
+ public:
   AzureOptions();
+  ~AzureOptions();
 
-  Status ConfigureAccountKeyCredentials(const std::string& account_name,
-                                        const std::string& account_key);
+  Status ConfigureAccountKeyCredential(const std::string& account_name,
+                                       const std::string& account_key);
 
   bool Equals(const AzureOptions& other) const;
+
+  const std::string& AccountBlobUrl() const { return account_blob_url_; }
+  const std::string& AccountDfsUrl() const { return account_dfs_url_; }
+
+  std::unique_ptr<Azure::Storage::Blobs::BlobServiceClient> 
MakeBlobServiceClient() const;
+
+  std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeServiceClient>
+  MakeDataLakeServiceClient() const;

Review Comment:
   Is it useful to expose these in the public API? We are not rewriting a Azure 
SDK API...



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -193,51 +265,123 @@ TEST(AzureFileSystem, OptionsCompare) {
   EXPECT_TRUE(options.Equals(options));
 }
 
-class AzureFileSystemTest : public ::testing::Test {
+struct PreexistingData {
+ public:
+  using RNG = std::mt19937_64;

Review Comment:
   I don't know if this is used to generate large-ish data, but if it is, there 
are faster random generators available.



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -193,51 +265,123 @@ TEST(AzureFileSystem, OptionsCompare) {
   EXPECT_TRUE(options.Equals(options));
 }
 
-class AzureFileSystemTest : public ::testing::Test {
+struct PreexistingData {
+ public:
+  using RNG = std::mt19937_64;
+
  public:
+  const std::string container_name;
+  static constexpr char const* kObjectName = "test-object-name";
+
+  static constexpr char const* kLoremIpsum = R"""(
+Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
+incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
+nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
+Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
+fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
+culpa qui officia deserunt mollit anim id est laborum.
+)""";
+
+ public:
+  explicit PreexistingData(RNG& rng) : 
container_name{RandomContainerName(rng)} {}
+
+  // Accessors
+  std::string ContainerPath() const { return container_name + '/'; }
+  std::string ObjectPath() const { return ContainerPath() + kObjectName; }
+  std::string NotFoundObjectPath() const { return ContainerPath() + 
"not-found"; }
+
+  std::string RandomDirectoryPath(RNG& rng) {
+    return internal::ConcatAbstractPath(container_name, 
RandomDirectoryName(rng));

Review Comment:
   I'm curious: why use `ContainerPath()` above and `ConcatAbstractPath()` here?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -310,10 +420,10 @@ class AzureFileSystemTest : public ::testing::Test {
   };
 
   // Need to use "void" as the return type to use ASSERT_* in this method.
-  void CreateHierarchicalData(HierarchicalPaths& paths) {
-    const auto container_path = RandomContainerName();
-    const auto directory_path =
-        internal::ConcatAbstractPath(container_path, RandomDirectoryName());
+  void CreateHierarchicalData(HierarchicalPaths* paths) {

Review Comment:
   +1



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -377,199 +487,337 @@ class AzureFileSystemTest : public ::testing::Test {
                    strlen(kSubData));
     AssertFileInfo(infos[10], "container/somefile", FileType::File, 
strlen(kSomeData));
     AssertFileInfo(infos[11], "empty-container", FileType::Directory);
-    AssertFileInfo(infos[12], PreexistingContainerName(), FileType::Directory);
-    AssertFileInfo(infos[13], PreexistingObjectPath(), FileType::File);
   }
-};
 
-class AzuriteFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() override {
-    EXPECT_THAT(GetAzuriteEnv(), NotNull());
-    ARROW_EXPECT_OK(GetAzuriteEnv()->status());
-    ARROW_ASSIGN_OR_RAISE(debug_log_start_, 
GetAzuriteEnv()->GetDebugLogSize());
-    AzureOptions options;
-    options.backend = AzureBackend::Azurite;
-    ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
-        GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
-    return options;
+  bool WithHierarchicalNamespace() const {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    return env->WithHierarchicalNamespace();
   }
 
-  void TearDown() override {
-    AzureFileSystemTest::TearDown();
-    if (HasFailure()) {
-      // XXX: This may not include all logs in the target test because
-      // Azurite doesn't flush debug logs immediately... You may want
-      // to check the log manually...
-      ARROW_IGNORE_EXPR(GetAzuriteEnv()->DumpDebugLog(debug_log_start_));
+  // Tests that are called from more than one implementation of 
AzureFileSystemTest
+
+  void DetectHierarchicalNamespaceTest();
+  void GetFileInfoObjectTest();
+  void GetFileInfoObjectWithNestedStructureTest();
+
+  void DeleteDirSuccessEmptyTest() {
+    auto data = SetUpPreexistingData();
+    const auto directory_path =
+        internal::ConcatAbstractPath(data.container_name, 
data.RandomDirectoryName(rng_));
+
+    if (WithHierarchicalNamespace()) {
+      ASSERT_OK(fs_->CreateDir(directory_path, true));
+      arrow::fs::AssertFileInfo(fs_.get(), directory_path, 
FileType::Directory);
+      ASSERT_OK(fs_->DeleteDir(directory_path));
+      arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+    } else {
+      // There is only virtual directory without hierarchical namespace
+      // support. So the CreateDir() and DeleteDir() do nothing.
+      ASSERT_OK(fs_->CreateDir(directory_path));
+      arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+      ASSERT_OK(fs_->DeleteDir(directory_path));
+      arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
     }
   }
 
-  int64_t debug_log_start_ = 0;
-};
+  void CreateDirSuccessContainerAndDirectoryTest() {
+    auto data = SetUpPreexistingData();
+    const auto path = data.RandomDirectoryPath(rng_);
+    ASSERT_OK(fs_->CreateDir(path, false));
+    if (WithHierarchicalNamespace()) {
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+    } else {
+      // There is only virtual directory without hierarchical namespace
+      // support. So the CreateDir() does nothing.
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+    }
+  }
 
-class AzureFlatNamespaceFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() override {
-    AzureOptions options;
-    const auto account_key = std::getenv("AZURE_FLAT_NAMESPACE_ACCOUNT_KEY");
-    const auto account_name = std::getenv("AZURE_FLAT_NAMESPACE_ACCOUNT_NAME");
-    if (account_key && account_name) {
-      RETURN_NOT_OK(options.ConfigureAccountKeyCredentials(account_name, 
account_key));
-      return options;
+  void CreateDirRecursiveSuccessContainerOnlyTest() {
+    auto container_name = PreexistingData::RandomContainerName(rng_);
+    ASSERT_OK(fs_->CreateDir(container_name, true));
+    arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::Directory);
+  }
+
+  void CreateDirRecursiveSuccessDirectoryOnlyTest() {
+    auto data = SetUpPreexistingData();
+    const auto parent = data.RandomDirectoryPath(rng_);
+    const auto path = internal::ConcatAbstractPath(parent, "new-sub");
+    ASSERT_OK(fs_->CreateDir(path, true));
+    if (WithHierarchicalNamespace()) {
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+      arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
+    } else {
+      // There is only virtual directory without hierarchical namespace
+      // support. So the CreateDir() does nothing.
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+      arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
     }
-    return Status::Cancelled(
-        "Connection details not provided for a real flat namespace "
-        "account.");
   }
-};
 
-// How to enable this test:
-//
-// You need an Azure account. You should be able to create a free
-// account at https://azure.microsoft.com/en-gb/free/ . You should be
-// able to create a storage account through the portal Web UI.
-//
-// See also the official document how to create a storage account:
-// 
https://learn.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account
-//
-// A few suggestions on configuration:
-//
-// * Use Standard general-purpose v2 not premium
-// * Use LRS redundancy
-// * Obviously you need to enable hierarchical namespace.
-// * Set the default access tier to hot
-// * SFTP, NFS and file shares are not required.
-class AzureHierarchicalNamespaceFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() override {
-    AzureOptions options;
-    const auto account_key = 
std::getenv("AZURE_HIERARCHICAL_NAMESPACE_ACCOUNT_KEY");
-    const auto account_name = 
std::getenv("AZURE_HIERARCHICAL_NAMESPACE_ACCOUNT_NAME");
-    if (account_key && account_name) {
-      RETURN_NOT_OK(options.ConfigureAccountKeyCredentials(account_name, 
account_key));
-      return options;
+  void CreateDirRecursiveSuccessContainerAndDirectoryTest() {
+    auto data = SetUpPreexistingData();
+    const auto parent = data.RandomDirectoryPath(rng_);
+    const auto path = internal::ConcatAbstractPath(parent, "new-sub");
+    ASSERT_OK(fs_->CreateDir(path, true));
+    if (WithHierarchicalNamespace()) {
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+      arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
+      arrow::fs::AssertFileInfo(fs_.get(), data.container_name, 
FileType::Directory);
+    } else {
+      // There is only virtual directory without hierarchical namespace
+      // support. So the CreateDir() does nothing.
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+      arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
+      arrow::fs::AssertFileInfo(fs_.get(), data.container_name, 
FileType::Directory);
     }
-    return Status::Cancelled(
-        "Connection details not provided for a real hierarchical namespace "
-        "account.");
   }
-};
 
-TEST_F(AzureFlatNamespaceFileSystemTest, DetectHierarchicalNamespace) {
-  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
-  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_OK_AND_EQ(false, 
hierarchical_namespace.Enabled(PreexistingContainerName()));
-}
+  void DeleteDirContentsSuccessNonexistentTest() {
+    auto data = SetUpPreexistingData();
+    const auto directory_path = data.RandomDirectoryPath(rng_);
+    ASSERT_OK(fs_->DeleteDirContents(directory_path, true));
+    arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+  }
 
-TEST_F(AzureHierarchicalNamespaceFileSystemTest, DetectHierarchicalNamespace) {
-  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
-  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_OK_AND_EQ(true, 
hierarchical_namespace.Enabled(PreexistingContainerName()));
-}
+  void DeleteDirContentsFailureNonexistentTest() {
+    auto data = SetUpPreexistingData();
+    const auto directory_path = data.RandomDirectoryPath(rng_);
+    ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
+  }
+};
 
-TEST_F(AzuriteFileSystemTest, DetectHierarchicalNamespace) {
-  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
-  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_OK_AND_EQ(false, 
hierarchical_namespace.Enabled(PreexistingContainerName()));
-}
+void AzureFileSystemTest::DetectHierarchicalNamespaceTest() {
+  // Check the environments are implemented and injected here correctly.
+  auto expected = WithHierarchicalNamespace();
 
-TEST_F(AzuriteFileSystemTest, 
DetectHierarchicalNamespaceFailsWithMissingContainer) {
+  auto data = SetUpPreexistingData();
   auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
   ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_NOT_OK(hierarchical_namespace.Enabled("nonexistent-container"));
+  ASSERT_OK_AND_EQ(expected, 
hierarchical_namespace.Enabled(data.container_name));
 }
 
-TEST_F(AzuriteFileSystemTest, GetFileInfoAccount) {
-  AssertFileInfo(fs_.get(), "", FileType::Directory);
-
-  // URI
-  ASSERT_RAISES(Invalid, fs_->GetFileInfo("abfs://"));
-}
-
-TEST_F(AzuriteFileSystemTest, GetFileInfoContainer) {
-  AssertFileInfo(fs_.get(), PreexistingContainerName(), FileType::Directory);
+void AzureFileSystemTest::GetFileInfoObjectTest() {
+  auto data = SetUpPreexistingData();
+  auto object_properties =
+      blob_service_client_->GetBlobContainerClient(data.container_name)
+          .GetBlobClient(data.kObjectName)
+          .GetProperties()
+          .Value;
 
-  AssertFileInfo(fs_.get(), "nonexistent-container", FileType::NotFound);
+  AssertFileInfo(fs_.get(), data.ObjectPath(), FileType::File,
+                 
std::chrono::system_clock::time_point{object_properties.LastModified},
+                 static_cast<int64_t>(object_properties.BlobSize));
 
   // URI
-  ASSERT_RAISES(Invalid, fs_->GetFileInfo("abfs://" + 
PreexistingContainerName()));
+  ASSERT_RAISES(Invalid, fs_->GetFileInfo("abfs://" + 
std::string{data.kObjectName}));
 }
 
-void AzureFileSystemTest::RunGetFileInfoObjectWithNestedStructureTest() {
+void AzureFileSystemTest::GetFileInfoObjectWithNestedStructureTest() {
+  auto data = SetUpPreexistingData();
   // Adds detailed tests to handle cases of different edge cases
   // with directory naming conventions (e.g. with and without slashes).
   constexpr auto kObjectName = 
"test-object-dir/some_other_dir/another_dir/foo";
   ASSERT_OK_AND_ASSIGN(
       auto output,
-      fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName, 
/*metadata=*/{}));
-  const std::string_view data(kLoremIpsum);
-  ASSERT_OK(output->Write(data));
+      fs_->OpenOutputStream(data.ContainerPath() + kObjectName, 
/*metadata=*/{}));
+  const std::string_view lorem_ipsum(PreexistingData::kLoremIpsum);
+  ASSERT_OK(output->Write(lorem_ipsum));
   ASSERT_OK(output->Close());
 
   // 0 is immediately after "/" lexicographically, ensure that this doesn't
   // cause unexpected issues.
-  ASSERT_OK_AND_ASSIGN(output,
-                       fs_->OpenOutputStream(
-                           PreexistingContainerPath() + 
"test-object-dir/some_other_dir0",
-                           /*metadata=*/{}));
-  ASSERT_OK(output->Write(data));
-  ASSERT_OK(output->Close());
   ASSERT_OK_AND_ASSIGN(
-      output, fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName + 
"0",
-                                    /*metadata=*/{}));
-  ASSERT_OK(output->Write(data));
+      output,
+      fs_->OpenOutputStream(data.ContainerPath() + 
"test-object-dir/some_other_dir0",
+                            /*metadata=*/{}));
+  ASSERT_OK(output->Write(lorem_ipsum));
+  ASSERT_OK(output->Close());
+  ASSERT_OK_AND_ASSIGN(output,
+                       fs_->OpenOutputStream(data.ContainerPath() + 
kObjectName + "0",
+                                             /*metadata=*/{}));
+  ASSERT_OK(output->Write(lorem_ipsum));
   ASSERT_OK(output->Close());
 
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName, 
FileType::File);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName + "/",
-                 FileType::NotFound);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-dir",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + kObjectName, 
FileType::File);
+  AssertFileInfo(fs_.get(), data.ContainerPath() + kObjectName + "/", 
FileType::NotFound);
+  AssertFileInfo(fs_.get(), data.ContainerPath() + "test-object-dir",
                  FileType::Directory);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-dir/",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + "test-object-dir/",
                  FileType::Directory);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + 
"test-object-dir/some_other_dir",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + 
"test-object-dir/some_other_dir",
                  FileType::Directory);
-  AssertFileInfo(fs_.get(),
-                 PreexistingContainerPath() + 
"test-object-dir/some_other_dir/",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + 
"test-object-dir/some_other_dir/",
                  FileType::Directory);
 
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-di",
-                 FileType::NotFound);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + 
"test-object-dir/some_other_di",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + "test-object-di", 
FileType::NotFound);
+  AssertFileInfo(fs_.get(), data.ContainerPath() + 
"test-object-dir/some_other_di",
                  FileType::NotFound);
+
+  if (WithHierarchicalNamespace()) {
+    datalake_service_client_->GetFileSystemClient(data.container_name)
+        .GetDirectoryClient("test-empty-object-dir")
+        .Create();
+
+    AssertFileInfo(fs_.get(), data.ContainerPath() + "test-empty-object-dir",
+                   FileType::Directory);
+  }
 }
 
-TEST_F(AzuriteFileSystemTest, GetFileInfoObjectWithNestedStructure) {
-  RunGetFileInfoObjectWithNestedStructureTest();
+template <class AzureEnvClass>
+class AzureFileSystemTestImpl : public AzureFileSystemTest {
+ public:
+  using AzureFileSystemTest::AzureFileSystemTest;
+
+  Result<BaseAzureEnv*> GetAzureEnv() const final { return 
AzureEnvClass::GetInstance(); }
+};
+
+// How to enable the non-Azurite tests:
+//
+// You need an Azure account. You should be able to create a free account [1].
+// Through the portal Web UI, you should create a storage account [2].
+//
+// A few suggestions on configuration:
+//
+// * Use Standard general-purpose v2 not premium
+// * Use LRS redundancy
+// * Set the default access tier to hot
+// * SFTP, NFS and file shares are not required.
+//
+// You must not enable Hierarchical Namespace on the storage account used for
+// AzureFlatNSFileSystemTest, but you must enable it on the storage account
+// used for AzureHierarchicalNSFileSystemTest.
+//
+// The credentials should be placed in the correct environment variables:
+//
+// * AZURE_FLAT_NAMESPACE_ACCOUNT_NAME
+// * AZURE_FLAT_NAMESPACE_ACCOUNT_KEY
+// * AZURE_HIERARCHICAL_NAMESPACE_ACCOUNT_NAME
+// * AZURE_HIERARCHICAL_NAMESPACE_ACCOUNT_KEY
+//
+// [1]: https://azure.microsoft.com/en-gb/free/
+// [2]:
+// 
https://learn.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account
+using AzureFlatNSFileSystemTest = AzureFileSystemTestImpl<AzureFlatNSEnv>;
+using AzureHierarchicalNSFileSystemTest = 
AzureFileSystemTestImpl<AzureHierarchicalNSEnv>;
+using AzuriteFileSystemTest = AzureFileSystemTestImpl<AzuriteEnv>;
+
+// Tests using all the 3 environments (Azurite, Azure w/o HNS (flat), Azure w/ 
HNS)
+
+template <class AzureEnvClass>
+using AzureFileSystemTestOnAllEnvs = AzureFileSystemTestImpl<AzureEnvClass>;
+
+using AllEnvironments =
+    ::testing::Types<AzuriteEnv, AzureFlatNSEnv, AzureHierarchicalNSEnv>;
+
+TYPED_TEST_SUITE(AzureFileSystemTestOnAllEnvs, AllEnvironments);
+
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, DetectHierarchicalNamespace) {
+  this->DetectHierarchicalNamespaceTest();
 }
 
-TEST_F(AzureHierarchicalNamespaceFileSystemTest, 
GetFileInfoObjectWithNestedStructure) {
-  RunGetFileInfoObjectWithNestedStructureTest();
-  datalake_service_client_->GetFileSystemClient(PreexistingContainerName())
-      .GetDirectoryClient("test-empty-object-dir")
-      .Create();
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, GetFileInfoObject) {
+  this->GetFileInfoObjectTest();
+}
 
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + 
"test-empty-object-dir",
-                 FileType::Directory);
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, DeleteDirSuccessEmpty) {
+  this->DeleteDirSuccessEmptyTest();
 }
 
-void AzureFileSystemTest::RunGetFileInfoObjectTest() {
-  auto object_properties =
-      blob_service_client_->GetBlobContainerClient(PreexistingContainerName())
-          .GetBlobClient(PreexistingObjectName())
-          .GetProperties()
-          .Value;
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, GetFileInfoObjectWithNestedStructure) 
{
+  this->GetFileInfoObjectWithNestedStructureTest();
+}
 
-  AssertFileInfo(fs_.get(), PreexistingObjectPath(), FileType::File,
-                 
std::chrono::system_clock::time_point(object_properties.LastModified),
-                 static_cast<int64_t>(object_properties.BlobSize));
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, 
CreateDirSuccessContainerAndDirectory) {
+  this->CreateDirSuccessContainerAndDirectoryTest();
+}
+
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, 
CreateDirRecursiveSuccessContainerOnly) {
+  this->CreateDirRecursiveSuccessContainerOnlyTest();
+}
+
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, 
CreateDirRecursiveSuccessDirectoryOnly) {
+  this->CreateDirRecursiveSuccessDirectoryOnlyTest();
+}
+
+TYPED_TEST(AzureFileSystemTestOnAllEnvs, 
CreateDirRecursiveSuccessContainerAndDirectory) {
+  this->CreateDirRecursiveSuccessContainerAndDirectoryTest();
+}
+
+// Tests using a real storage account *with Hierarchical Namespace enabled*
+
+TEST_F(AzureHierarchicalNSFileSystemTest, DeleteDirFailureNonexistent) {
+  auto data = SetUpPreexistingData();
+  const auto path = data.RandomDirectoryPath(rng_);
+  ASSERT_RAISES(IOError, fs_->DeleteDir(path));
+}
+
+TEST_F(AzureHierarchicalNSFileSystemTest, DeleteDirSuccessHaveBlob) {
+  auto data = SetUpPreexistingData();
+  const auto directory_path = data.RandomDirectoryPath(rng_);
+  const auto blob_path = internal::ConcatAbstractPath(directory_path, 
"hello.txt");
+  ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
+  ASSERT_OK(output->Write(std::string_view("hello")));
+  ASSERT_OK(output->Close());
+  arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
+  ASSERT_OK(fs_->DeleteDir(directory_path));
+  arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
+}
+
+TEST_F(AzureHierarchicalNSFileSystemTest, DeleteDirSuccessHaveDirectory) {
+  auto data = SetUpPreexistingData();
+  const auto parent = data.RandomDirectoryPath(rng_);
+  const auto path = internal::ConcatAbstractPath(parent, "new-sub");
+  ASSERT_OK(fs_->CreateDir(path, true));
+  arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+  arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
+  ASSERT_OK(fs_->DeleteDir(parent));
+  arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+  arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
+}
+
+TEST_F(AzureHierarchicalNSFileSystemTest, DeleteDirContentsSuccessExist) {
+  auto preexisting_data = SetUpPreexistingData();
+  HierarchicalPaths paths;
+  CreateHierarchicalData(&paths);
+  ASSERT_OK(fs_->DeleteDirContents(paths.directory));
+  arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::Directory);
+  for (const auto& sub_path : paths.sub_paths) {
+    arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound);
+  }
+}
+
+TEST_F(AzureHierarchicalNSFileSystemTest, DeleteDirContentsSuccessNonexistent) 
{
+  this->DeleteDirContentsSuccessNonexistentTest();
+}
+
+TEST_F(AzureHierarchicalNSFileSystemTest, DeleteDirContentsFailureNonexistent) 
{
+  this->DeleteDirContentsFailureNonexistentTest();
+}
+
+// Tests using Azurite (the local Azure emulator)
+
+TEST_F(AzuriteFileSystemTest, 
DetectHierarchicalNamespaceFailsWithMissingContainer) {
+  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
+  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
+  ASSERT_NOT_OK(hierarchical_namespace.Enabled("nonexistent-container"));

Review Comment:
   Do we expect it to raise a specific error? Can we use `ASSERT_RAISES`?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -71,56 +72,113 @@ using ::testing::Not;
 using ::testing::NotNull;
 
 namespace Blobs = Azure::Storage::Blobs;
-namespace Files = Azure::Storage::Files;
+namespace Core = Azure::Core;
+namespace DataLake = Azure::Storage::Files::DataLake;
 
-auto const* kLoremIpsum = R"""(
-Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
-incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
-nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
-Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
-fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
-culpa qui officia deserunt mollit anim id est laborum.
-)""";
+class BaseAzureEnv : public ::testing::Environment {
+ protected:
+  std::string account_name_;
+  std::string account_key_;
+
+  BaseAzureEnv(std::string account_name, std::string account_key)
+      : account_name_(std::move(account_name)), 
account_key_(std::move(account_key)) {}
 
-class AzuriteEnv : public ::testing::Environment {
  public:
-  AzuriteEnv() {
-    account_name_ = "devstoreaccount1";
-    account_key_ =
-        
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/"
-        "KBHBeksoGMGw==";
-    auto exe_path = bp::search_path("azurite");
-    if (exe_path.empty()) {
-      auto error = std::string("Could not find Azurite emulator.");
-      status_ = Status::Invalid(error);
-      return;
+  ~BaseAzureEnv() override = default;
+
+  const std::string& account_name() const { return account_name_; }
+  const std::string& account_key() const { return account_key_; }
+
+  virtual AzureBackend backend() const = 0;
+
+  virtual bool WithHierarchicalNamespace() const { return false; }
+
+  virtual Result<int64_t> GetDebugLogSize() { return 0; }
+  virtual Status DumpDebugLog(int64_t position) {
+    return Status::NotImplemented("BaseAzureEnv::DumpDebugLog");
+  }
+};
+
+template <class AzureEnvClass>
+class AzureEnvImpl : public BaseAzureEnv {
+ protected:
+  static Result<BaseAzureEnv*> MakeAndAddToGlobalTestEnvironment() {
+    ARROW_ASSIGN_OR_RAISE(auto instance, AzureEnvClass::Make());
+    auto* heap_ptr = instance.release();
+    ::testing::AddGlobalTestEnvironment(heap_ptr);
+    return heap_ptr;
+  }
+
+  using BaseAzureEnv::BaseAzureEnv;
+
+  static Result<std::unique_ptr<AzureEnvClass>> MakeFromEnvVars(

Review Comment:
   There are several different factory functions here, with differing 
signatures and no obvious clue as to which one should be used and when. Can you 
comment a bit on the purpose/motivation?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -71,56 +72,113 @@ using ::testing::Not;
 using ::testing::NotNull;
 
 namespace Blobs = Azure::Storage::Blobs;
-namespace Files = Azure::Storage::Files;
+namespace Core = Azure::Core;
+namespace DataLake = Azure::Storage::Files::DataLake;
 
-auto const* kLoremIpsum = R"""(
-Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
-incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
-nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
-Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
-fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
-culpa qui officia deserunt mollit anim id est laborum.
-)""";
+class BaseAzureEnv : public ::testing::Environment {
+ protected:
+  std::string account_name_;
+  std::string account_key_;
+
+  BaseAzureEnv(std::string account_name, std::string account_key)
+      : account_name_(std::move(account_name)), 
account_key_(std::move(account_key)) {}
 
-class AzuriteEnv : public ::testing::Environment {
  public:
-  AzuriteEnv() {
-    account_name_ = "devstoreaccount1";
-    account_key_ =
-        
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/"
-        "KBHBeksoGMGw==";
-    auto exe_path = bp::search_path("azurite");
-    if (exe_path.empty()) {
-      auto error = std::string("Could not find Azurite emulator.");
-      status_ = Status::Invalid(error);
-      return;
+  ~BaseAzureEnv() override = default;
+
+  const std::string& account_name() const { return account_name_; }
+  const std::string& account_key() const { return account_key_; }
+
+  virtual AzureBackend backend() const = 0;
+
+  virtual bool WithHierarchicalNamespace() const { return false; }
+
+  virtual Result<int64_t> GetDebugLogSize() { return 0; }
+  virtual Status DumpDebugLog(int64_t position) {
+    return Status::NotImplemented("BaseAzureEnv::DumpDebugLog");
+  }
+};
+
+template <class AzureEnvClass>
+class AzureEnvImpl : public BaseAzureEnv {
+ protected:
+  static Result<BaseAzureEnv*> MakeAndAddToGlobalTestEnvironment() {
+    ARROW_ASSIGN_OR_RAISE(auto instance, AzureEnvClass::Make());
+    auto* heap_ptr = instance.release();
+    ::testing::AddGlobalTestEnvironment(heap_ptr);
+    return heap_ptr;
+  }
+
+  using BaseAzureEnv::BaseAzureEnv;
+
+  static Result<std::unique_ptr<AzureEnvClass>> MakeFromEnvVars(
+      const std::string& account_name_var, const std::string& account_key_var) 
{
+    const auto account_name = std::getenv(account_name_var.c_str());
+    const auto account_key = std::getenv(account_key_var.c_str());
+    if (!account_name) {
+      return Status::Cancelled(account_name_var + " not set.");
     }
-    auto temp_dir_ = *TemporaryDir::Make("azurefs-test-");
-    auto debug_log_path_result = temp_dir_->path().Join("debug.log");
-    if (!debug_log_path_result.ok()) {
-      status_ = debug_log_path_result.status();
-      return;
+    if (!account_key) {
+      return Status::Cancelled(account_key_var + " not set.");
     }
-    debug_log_path_ = *debug_log_path_result;
-    server_process_ =
-        bp::child(boost::this_process::environment(), exe_path, "--silent", 
"--location",
-                  temp_dir_->path().ToString(), "--debug", 
debug_log_path_.ToString());
-    if (!(server_process_.valid() && server_process_.running())) {
-      auto error = "Could not start Azurite emulator.";
-      server_process_.terminate();
-      server_process_.wait();
-      status_ = Status::Invalid(error);
-      return;
+    return std::unique_ptr<AzureEnvClass>{new AzureEnvClass(account_name, 
account_key)};
+  }
+
+ public:
+  ~AzureEnvImpl() override = default;
+
+  static Result<BaseAzureEnv*> GetInstance() {
+    static auto env = MakeAndAddToGlobalTestEnvironment();
+    if (env.ok()) {
+      return env;
     }
-    status_ = Status::OK();
+    return env.status();
   }
 
+  AzureBackend backend() const final { return AzureEnvClass::kBackend; }
+};
+
+class AzuriteEnv : public AzureEnvImpl<AzuriteEnv> {
+ private:
+  std::unique_ptr<TemporaryDir> temp_dir_;
+  arrow::internal::PlatformFilename debug_log_path_;
+  bp::child server_process_;
+
+  using AzureEnvImpl::AzureEnvImpl;
+
+ public:
+  static const AzureBackend kBackend = AzureBackend::kAzurite;
+
   ~AzuriteEnv() override {
     server_process_.terminate();
     server_process_.wait();
   }
 
-  Result<int64_t> GetDebugLogSize() {
+  static Result<std::unique_ptr<AzureEnvImpl>> Make() {

Review Comment:
   Yet another factory function that doesn't even delegate to the parent :-)



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -193,51 +265,123 @@ TEST(AzureFileSystem, OptionsCompare) {
   EXPECT_TRUE(options.Equals(options));
 }
 
-class AzureFileSystemTest : public ::testing::Test {
+struct PreexistingData {
+ public:
+  using RNG = std::mt19937_64;
+
  public:
+  const std::string container_name;
+  static constexpr char const* kObjectName = "test-object-name";
+
+  static constexpr char const* kLoremIpsum = R"""(
+Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
+incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
+nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
+Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
+fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
+culpa qui officia deserunt mollit anim id est laborum.
+)""";
+
+ public:
+  explicit PreexistingData(RNG& rng) : 
container_name{RandomContainerName(rng)} {}
+
+  // Accessors
+  std::string ContainerPath() const { return container_name + '/'; }
+  std::string ObjectPath() const { return ContainerPath() + kObjectName; }
+  std::string NotFoundObjectPath() const { return ContainerPath() + 
"not-found"; }
+
+  std::string RandomDirectoryPath(RNG& rng) {
+    return internal::ConcatAbstractPath(container_name, 
RandomDirectoryName(rng));
+  }
+
+  // Utilities
+
+  static std::string RandomContainerName(RNG& rng) { return RandomChars(32, 
rng); }
+  static std::string RandomDirectoryName(RNG& rng) { return RandomChars(32, 
rng); }
+
+  static std::string RandomLine(int lineno, std::size_t width, RNG& rng) {

Review Comment:
   The codebase generally uses `int`, `int32_t` or `int64_t`. There is nothing 
conceptually wrong with `size_t` but the multiplicity of integer types makes 
the code less pleasant to read.
   
   (also, _please_ omit the `std::` prefix)



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -71,56 +72,113 @@ using ::testing::Not;
 using ::testing::NotNull;
 
 namespace Blobs = Azure::Storage::Blobs;
-namespace Files = Azure::Storage::Files;
+namespace Core = Azure::Core;
+namespace DataLake = Azure::Storage::Files::DataLake;
 
-auto const* kLoremIpsum = R"""(
-Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
-incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
-nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
-Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
-fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
-culpa qui officia deserunt mollit anim id est laborum.
-)""";
+class BaseAzureEnv : public ::testing::Environment {
+ protected:
+  std::string account_name_;
+  std::string account_key_;
+
+  BaseAzureEnv(std::string account_name, std::string account_key)
+      : account_name_(std::move(account_name)), 
account_key_(std::move(account_key)) {}
 
-class AzuriteEnv : public ::testing::Environment {
  public:
-  AzuriteEnv() {
-    account_name_ = "devstoreaccount1";
-    account_key_ =
-        
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/"
-        "KBHBeksoGMGw==";
-    auto exe_path = bp::search_path("azurite");
-    if (exe_path.empty()) {
-      auto error = std::string("Could not find Azurite emulator.");
-      status_ = Status::Invalid(error);
-      return;
+  ~BaseAzureEnv() override = default;
+
+  const std::string& account_name() const { return account_name_; }
+  const std::string& account_key() const { return account_key_; }
+
+  virtual AzureBackend backend() const = 0;
+
+  virtual bool WithHierarchicalNamespace() const { return false; }
+
+  virtual Result<int64_t> GetDebugLogSize() { return 0; }
+  virtual Status DumpDebugLog(int64_t position) {
+    return Status::NotImplemented("BaseAzureEnv::DumpDebugLog");
+  }
+};
+
+template <class AzureEnvClass>
+class AzureEnvImpl : public BaseAzureEnv {
+ protected:
+  static Result<BaseAzureEnv*> MakeAndAddToGlobalTestEnvironment() {
+    ARROW_ASSIGN_OR_RAISE(auto instance, AzureEnvClass::Make());
+    auto* heap_ptr = instance.release();
+    ::testing::AddGlobalTestEnvironment(heap_ptr);
+    return heap_ptr;
+  }
+
+  using BaseAzureEnv::BaseAzureEnv;
+
+  static Result<std::unique_ptr<AzureEnvClass>> MakeFromEnvVars(
+      const std::string& account_name_var, const std::string& account_key_var) 
{
+    const auto account_name = std::getenv(account_name_var.c_str());
+    const auto account_key = std::getenv(account_key_var.c_str());
+    if (!account_name) {
+      return Status::Cancelled(account_name_var + " not set.");
     }
-    auto temp_dir_ = *TemporaryDir::Make("azurefs-test-");
-    auto debug_log_path_result = temp_dir_->path().Join("debug.log");
-    if (!debug_log_path_result.ok()) {
-      status_ = debug_log_path_result.status();
-      return;
+    if (!account_key) {
+      return Status::Cancelled(account_key_var + " not set.");
     }
-    debug_log_path_ = *debug_log_path_result;
-    server_process_ =
-        bp::child(boost::this_process::environment(), exe_path, "--silent", 
"--location",
-                  temp_dir_->path().ToString(), "--debug", 
debug_log_path_.ToString());
-    if (!(server_process_.valid() && server_process_.running())) {
-      auto error = "Could not start Azurite emulator.";
-      server_process_.terminate();
-      server_process_.wait();
-      status_ = Status::Invalid(error);
-      return;
+    return std::unique_ptr<AzureEnvClass>{new AzureEnvClass(account_name, 
account_key)};
+  }
+
+ public:
+  ~AzureEnvImpl() override = default;

Review Comment:
   Is this useful?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -71,56 +72,113 @@ using ::testing::Not;
 using ::testing::NotNull;
 
 namespace Blobs = Azure::Storage::Blobs;
-namespace Files = Azure::Storage::Files;
+namespace Core = Azure::Core;
+namespace DataLake = Azure::Storage::Files::DataLake;
 
-auto const* kLoremIpsum = R"""(
-Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
-incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
-nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
-Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
-fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
-culpa qui officia deserunt mollit anim id est laborum.
-)""";
+class BaseAzureEnv : public ::testing::Environment {
+ protected:
+  std::string account_name_;
+  std::string account_key_;
+
+  BaseAzureEnv(std::string account_name, std::string account_key)
+      : account_name_(std::move(account_name)), 
account_key_(std::move(account_key)) {}
 
-class AzuriteEnv : public ::testing::Environment {
  public:
-  AzuriteEnv() {
-    account_name_ = "devstoreaccount1";
-    account_key_ =
-        
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/"
-        "KBHBeksoGMGw==";
-    auto exe_path = bp::search_path("azurite");
-    if (exe_path.empty()) {
-      auto error = std::string("Could not find Azurite emulator.");
-      status_ = Status::Invalid(error);
-      return;
+  ~BaseAzureEnv() override = default;
+
+  const std::string& account_name() const { return account_name_; }
+  const std::string& account_key() const { return account_key_; }
+
+  virtual AzureBackend backend() const = 0;
+
+  virtual bool WithHierarchicalNamespace() const { return false; }
+
+  virtual Result<int64_t> GetDebugLogSize() { return 0; }
+  virtual Status DumpDebugLog(int64_t position) {
+    return Status::NotImplemented("BaseAzureEnv::DumpDebugLog");
+  }
+};
+
+template <class AzureEnvClass>
+class AzureEnvImpl : public BaseAzureEnv {
+ protected:
+  static Result<BaseAzureEnv*> MakeAndAddToGlobalTestEnvironment() {
+    ARROW_ASSIGN_OR_RAISE(auto instance, AzureEnvClass::Make());
+    auto* heap_ptr = instance.release();
+    ::testing::AddGlobalTestEnvironment(heap_ptr);
+    return heap_ptr;
+  }
+
+  using BaseAzureEnv::BaseAzureEnv;
+
+  static Result<std::unique_ptr<AzureEnvClass>> MakeFromEnvVars(
+      const std::string& account_name_var, const std::string& account_key_var) 
{
+    const auto account_name = std::getenv(account_name_var.c_str());
+    const auto account_key = std::getenv(account_key_var.c_str());
+    if (!account_name) {
+      return Status::Cancelled(account_name_var + " not set.");
     }
-    auto temp_dir_ = *TemporaryDir::Make("azurefs-test-");
-    auto debug_log_path_result = temp_dir_->path().Join("debug.log");
-    if (!debug_log_path_result.ok()) {
-      status_ = debug_log_path_result.status();
-      return;
+    if (!account_key) {
+      return Status::Cancelled(account_key_var + " not set.");

Review Comment:
   If one env var is set but not the other, it should probably be an error 
rather than a skip.



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -71,56 +72,113 @@ using ::testing::Not;
 using ::testing::NotNull;
 
 namespace Blobs = Azure::Storage::Blobs;
-namespace Files = Azure::Storage::Files;
+namespace Core = Azure::Core;
+namespace DataLake = Azure::Storage::Files::DataLake;
 
-auto const* kLoremIpsum = R"""(
-Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
-incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
-nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
-Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
-fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
-culpa qui officia deserunt mollit anim id est laborum.
-)""";
+class BaseAzureEnv : public ::testing::Environment {
+ protected:
+  std::string account_name_;
+  std::string account_key_;
+
+  BaseAzureEnv(std::string account_name, std::string account_key)
+      : account_name_(std::move(account_name)), 
account_key_(std::move(account_key)) {}
 
-class AzuriteEnv : public ::testing::Environment {
  public:
-  AzuriteEnv() {
-    account_name_ = "devstoreaccount1";
-    account_key_ =
-        
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/"
-        "KBHBeksoGMGw==";
-    auto exe_path = bp::search_path("azurite");
-    if (exe_path.empty()) {
-      auto error = std::string("Could not find Azurite emulator.");
-      status_ = Status::Invalid(error);
-      return;
+  ~BaseAzureEnv() override = default;

Review Comment:
   Is this useful?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -193,51 +265,123 @@ TEST(AzureFileSystem, OptionsCompare) {
   EXPECT_TRUE(options.Equals(options));
 }
 
-class AzureFileSystemTest : public ::testing::Test {
+struct PreexistingData {
+ public:
+  using RNG = std::mt19937_64;
+
  public:
+  const std::string container_name;
+  static constexpr char const* kObjectName = "test-object-name";
+
+  static constexpr char const* kLoremIpsum = R"""(
+Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
+incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
+nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
+Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
+fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
+culpa qui officia deserunt mollit anim id est laborum.
+)""";
+
+ public:
+  explicit PreexistingData(RNG& rng) : 
container_name{RandomContainerName(rng)} {}
+
+  // Accessors
+  std::string ContainerPath() const { return container_name + '/'; }
+  std::string ObjectPath() const { return ContainerPath() + kObjectName; }
+  std::string NotFoundObjectPath() const { return ContainerPath() + 
"not-found"; }
+
+  std::string RandomDirectoryPath(RNG& rng) {
+    return internal::ConcatAbstractPath(container_name, 
RandomDirectoryName(rng));
+  }
+
+  // Utilities
+
+  static std::string RandomContainerName(RNG& rng) { return RandomChars(32, 
rng); }
+  static std::string RandomDirectoryName(RNG& rng) { return RandomChars(32, 
rng); }
+
+  static std::string RandomLine(int lineno, std::size_t width, RNG& rng) {
+    auto line = std::to_string(lineno) + ":    ";
+    line += RandomChars(width - line.size() - 1, rng);
+    line += '\n';
+    return line;
+  }
+
+  static std::size_t RandomIndex(std::size_t end, RNG& rng) {
+    return std::uniform_int_distribution<std::size_t>(0, end - 1)(rng);
+  }
+
+  static std::string RandomChars(std::size_t count, RNG& rng) {
+    auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
+    std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);
+    std::string s;
+    std::generate_n(std::back_inserter(s), count, [&] { return 
fillers[d(rng)]; });
+    return s;
+  }
+};
+
+class AzureFileSystemTest : public ::testing::Test {
+ protected:
+  // Set in constructor
+  std::mt19937_64 rng_;
+
+  // Set in SetUp()
+  int64_t debug_log_start_ = 0;
+  bool set_up_succeeded_ = false;
+  AzureOptions options_;
+
   std::shared_ptr<FileSystem> fs_;
   std::unique_ptr<Blobs::BlobServiceClient> blob_service_client_;
-  std::unique_ptr<Files::DataLake::DataLakeServiceClient> 
datalake_service_client_;
-  AzureOptions options_;
-  std::mt19937_64 generator_;
-  std::string container_name_;
-  bool suite_skipped_ = false;
+  std::unique_ptr<DataLake::DataLakeServiceClient> datalake_service_client_;
 
-  AzureFileSystemTest() : generator_(std::random_device()()) {}
+ public:
+  AzureFileSystemTest() : rng_(std::random_device()()) {}
+
+  virtual Result<BaseAzureEnv*> GetAzureEnv() const = 0;
 
-  virtual Result<AzureOptions> MakeOptions() = 0;
+  static Result<AzureOptions> MakeOptions(BaseAzureEnv* env) {
+    AzureOptions options;
+    options.backend = env->backend();
+    ARROW_EXPECT_OK(
+        options.ConfigureAccountKeyCredential(env->account_name(), 
env->account_key()));
+    return options;
+  }
 
   void SetUp() override {
-    auto options = MakeOptions();
-    if (options.ok()) {
-      options_ = *options;
+    auto make_options = [this]() -> Result<AzureOptions> {
+      ARROW_ASSIGN_OR_RAISE(auto env, GetAzureEnv());
+      EXPECT_THAT(env, NotNull());
+      ARROW_ASSIGN_OR_RAISE(debug_log_start_, env->GetDebugLogSize());
+      return MakeOptions(env);
+    };
+    auto options_res = make_options();
+    if (!options_res.ok() && options_res.status().IsCancelled()) {

Review Comment:
   Can probably simply be
   ```suggestion
       if (options_res.status().IsCancelled()) {
   ```



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -254,54 +398,20 @@ class AzureFileSystemTest : public ::testing::Test {
     return blob_client;
   }
 
-  std::string PreexistingContainerName() const { return container_name_; }
-
-  std::string PreexistingContainerPath() const {
-    return PreexistingContainerName() + '/';
-  }
-
-  static std::string PreexistingObjectName() { return "test-object-name"; }
-
-  std::string PreexistingObjectPath() const {
-    return PreexistingContainerPath() + PreexistingObjectName();
-  }
-
-  std::string NotFoundObjectPath() { return PreexistingContainerPath() + 
"not-found"; }
-
-  std::string RandomLine(int lineno, std::size_t width) {
-    auto line = std::to_string(lineno) + ":    ";
-    line += RandomChars(width - line.size() - 1);
-    line += '\n';
-    return line;
-  }
-
-  std::size_t RandomIndex(std::size_t end) {
-    return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_);
-  }
-
-  std::string RandomChars(std::size_t count) {
-    auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
-    std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);
-    std::string s;
-    std::generate_n(std::back_inserter(s), count, [&] { return 
fillers[d(generator_)]; });
-    return s;
-  }
-
-  std::string RandomContainerName() { return RandomChars(32); }
-
-  std::string RandomDirectoryName() { return RandomChars(32); }
-
-  void UploadLines(const std::vector<std::string>& lines, const char* 
path_to_file,
+  void UploadLines(const std::vector<std::string>& lines, const std::string& 
path,
                    int total_size) {
-    const auto path = PreexistingContainerPath() + path_to_file;
     ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {}));
     const auto all_lines = std::accumulate(lines.begin(), lines.end(), 
std::string(""));
     ASSERT_OK(output->Write(all_lines));
     ASSERT_OK(output->Close());
   }
 
-  void RunGetFileInfoObjectWithNestedStructureTest();
-  void RunGetFileInfoObjectTest();
+  PreexistingData SetUpPreexistingData() {
+    PreexistingData data(rng_);
+    auto container_client = CreateContainer(data.container_name);
+    CreateBlob(container_client, data.kObjectName, 
PreexistingData::kLoremIpsum);

Review Comment:
   Can `CreateBlob` throw an exception? 



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -193,51 +265,123 @@ TEST(AzureFileSystem, OptionsCompare) {
   EXPECT_TRUE(options.Equals(options));
 }
 
-class AzureFileSystemTest : public ::testing::Test {
+struct PreexistingData {
+ public:
+  using RNG = std::mt19937_64;
+
  public:
+  const std::string container_name;
+  static constexpr char const* kObjectName = "test-object-name";
+
+  static constexpr char const* kLoremIpsum = R"""(
+Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
+incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
+nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
+Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
+fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
+culpa qui officia deserunt mollit anim id est laborum.
+)""";
+
+ public:
+  explicit PreexistingData(RNG& rng) : 
container_name{RandomContainerName(rng)} {}
+
+  // Accessors
+  std::string ContainerPath() const { return container_name + '/'; }
+  std::string ObjectPath() const { return ContainerPath() + kObjectName; }
+  std::string NotFoundObjectPath() const { return ContainerPath() + 
"not-found"; }
+
+  std::string RandomDirectoryPath(RNG& rng) {
+    return internal::ConcatAbstractPath(container_name, 
RandomDirectoryName(rng));
+  }
+
+  // Utilities
+
+  static std::string RandomContainerName(RNG& rng) { return RandomChars(32, 
rng); }
+  static std::string RandomDirectoryName(RNG& rng) { return RandomChars(32, 
rng); }
+
+  static std::string RandomLine(int lineno, std::size_t width, RNG& rng) {
+    auto line = std::to_string(lineno) + ":    ";
+    line += RandomChars(width - line.size() - 1, rng);
+    line += '\n';
+    return line;
+  }
+
+  static std::size_t RandomIndex(std::size_t end, RNG& rng) {
+    return std::uniform_int_distribution<std::size_t>(0, end - 1)(rng);
+  }
+
+  static std::string RandomChars(std::size_t count, RNG& rng) {
+    auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
+    std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);
+    std::string s;
+    std::generate_n(std::back_inserter(s), count, [&] { return 
fillers[d(rng)]; });
+    return s;
+  }
+};
+
+class AzureFileSystemTest : public ::testing::Test {
+ protected:
+  // Set in constructor
+  std::mt19937_64 rng_;
+
+  // Set in SetUp()
+  int64_t debug_log_start_ = 0;
+  bool set_up_succeeded_ = false;
+  AzureOptions options_;
+
   std::shared_ptr<FileSystem> fs_;
   std::unique_ptr<Blobs::BlobServiceClient> blob_service_client_;
-  std::unique_ptr<Files::DataLake::DataLakeServiceClient> 
datalake_service_client_;
-  AzureOptions options_;
-  std::mt19937_64 generator_;
-  std::string container_name_;
-  bool suite_skipped_ = false;
+  std::unique_ptr<DataLake::DataLakeServiceClient> datalake_service_client_;
 
-  AzureFileSystemTest() : generator_(std::random_device()()) {}
+ public:
+  AzureFileSystemTest() : rng_(std::random_device()()) {}

Review Comment:
   I'm not sure it's a good idea to vary the seed on each test invocation. For 
reproducibility, it would probably be better to use a fixed seed as in most 
other tests.



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -377,199 +487,337 @@ class AzureFileSystemTest : public ::testing::Test {
                    strlen(kSubData));
     AssertFileInfo(infos[10], "container/somefile", FileType::File, 
strlen(kSomeData));
     AssertFileInfo(infos[11], "empty-container", FileType::Directory);
-    AssertFileInfo(infos[12], PreexistingContainerName(), FileType::Directory);
-    AssertFileInfo(infos[13], PreexistingObjectPath(), FileType::File);
   }
-};
 
-class AzuriteFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() override {
-    EXPECT_THAT(GetAzuriteEnv(), NotNull());
-    ARROW_EXPECT_OK(GetAzuriteEnv()->status());
-    ARROW_ASSIGN_OR_RAISE(debug_log_start_, 
GetAzuriteEnv()->GetDebugLogSize());
-    AzureOptions options;
-    options.backend = AzureBackend::Azurite;
-    ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
-        GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
-    return options;
+  bool WithHierarchicalNamespace() const {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    return env->WithHierarchicalNamespace();
   }
 
-  void TearDown() override {
-    AzureFileSystemTest::TearDown();
-    if (HasFailure()) {
-      // XXX: This may not include all logs in the target test because
-      // Azurite doesn't flush debug logs immediately... You may want
-      // to check the log manually...
-      ARROW_IGNORE_EXPR(GetAzuriteEnv()->DumpDebugLog(debug_log_start_));
+  // Tests that are called from more than one implementation of 
AzureFileSystemTest
+
+  void DetectHierarchicalNamespaceTest();

Review Comment:
   Can we keep the current convention of calling test methods `TestSomething` 
rather than `SomethingTest`?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -377,199 +487,337 @@ class AzureFileSystemTest : public ::testing::Test {
                    strlen(kSubData));
     AssertFileInfo(infos[10], "container/somefile", FileType::File, 
strlen(kSomeData));
     AssertFileInfo(infos[11], "empty-container", FileType::Directory);
-    AssertFileInfo(infos[12], PreexistingContainerName(), FileType::Directory);
-    AssertFileInfo(infos[13], PreexistingObjectPath(), FileType::File);
   }
-};
 
-class AzuriteFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() override {
-    EXPECT_THAT(GetAzuriteEnv(), NotNull());
-    ARROW_EXPECT_OK(GetAzuriteEnv()->status());
-    ARROW_ASSIGN_OR_RAISE(debug_log_start_, 
GetAzuriteEnv()->GetDebugLogSize());
-    AzureOptions options;
-    options.backend = AzureBackend::Azurite;
-    ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
-        GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
-    return options;
+  bool WithHierarchicalNamespace() const {

Review Comment:
   Should be `override`?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -377,199 +487,337 @@ class AzureFileSystemTest : public ::testing::Test {
                    strlen(kSubData));
     AssertFileInfo(infos[10], "container/somefile", FileType::File, 
strlen(kSomeData));
     AssertFileInfo(infos[11], "empty-container", FileType::Directory);
-    AssertFileInfo(infos[12], PreexistingContainerName(), FileType::Directory);
-    AssertFileInfo(infos[13], PreexistingObjectPath(), FileType::File);
   }
-};
 
-class AzuriteFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() override {
-    EXPECT_THAT(GetAzuriteEnv(), NotNull());
-    ARROW_EXPECT_OK(GetAzuriteEnv()->status());
-    ARROW_ASSIGN_OR_RAISE(debug_log_start_, 
GetAzuriteEnv()->GetDebugLogSize());
-    AzureOptions options;
-    options.backend = AzureBackend::Azurite;
-    ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
-        GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
-    return options;
+  bool WithHierarchicalNamespace() const {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    return env->WithHierarchicalNamespace();
   }
 
-  void TearDown() override {
-    AzureFileSystemTest::TearDown();
-    if (HasFailure()) {
-      // XXX: This may not include all logs in the target test because
-      // Azurite doesn't flush debug logs immediately... You may want
-      // to check the log manually...
-      ARROW_IGNORE_EXPR(GetAzuriteEnv()->DumpDebugLog(debug_log_start_));
+  // Tests that are called from more than one implementation of 
AzureFileSystemTest
+
+  void DetectHierarchicalNamespaceTest();
+  void GetFileInfoObjectTest();
+  void GetFileInfoObjectWithNestedStructureTest();
+
+  void DeleteDirSuccessEmptyTest() {
+    auto data = SetUpPreexistingData();
+    const auto directory_path =
+        internal::ConcatAbstractPath(data.container_name, 
data.RandomDirectoryName(rng_));
+
+    if (WithHierarchicalNamespace()) {
+      ASSERT_OK(fs_->CreateDir(directory_path, true));
+      arrow::fs::AssertFileInfo(fs_.get(), directory_path, 
FileType::Directory);
+      ASSERT_OK(fs_->DeleteDir(directory_path));
+      arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+    } else {
+      // There is only virtual directory without hierarchical namespace
+      // support. So the CreateDir() and DeleteDir() do nothing.
+      ASSERT_OK(fs_->CreateDir(directory_path));
+      arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+      ASSERT_OK(fs_->DeleteDir(directory_path));
+      arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
     }
   }
 
-  int64_t debug_log_start_ = 0;
-};
+  void CreateDirSuccessContainerAndDirectoryTest() {
+    auto data = SetUpPreexistingData();
+    const auto path = data.RandomDirectoryPath(rng_);
+    ASSERT_OK(fs_->CreateDir(path, false));
+    if (WithHierarchicalNamespace()) {
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+    } else {
+      // There is only virtual directory without hierarchical namespace
+      // support. So the CreateDir() does nothing.
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+    }
+  }
 
-class AzureFlatNamespaceFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() override {
-    AzureOptions options;
-    const auto account_key = std::getenv("AZURE_FLAT_NAMESPACE_ACCOUNT_KEY");
-    const auto account_name = std::getenv("AZURE_FLAT_NAMESPACE_ACCOUNT_NAME");
-    if (account_key && account_name) {
-      RETURN_NOT_OK(options.ConfigureAccountKeyCredentials(account_name, 
account_key));
-      return options;
+  void CreateDirRecursiveSuccessContainerOnlyTest() {
+    auto container_name = PreexistingData::RandomContainerName(rng_);
+    ASSERT_OK(fs_->CreateDir(container_name, true));
+    arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::Directory);
+  }
+
+  void CreateDirRecursiveSuccessDirectoryOnlyTest() {
+    auto data = SetUpPreexistingData();
+    const auto parent = data.RandomDirectoryPath(rng_);
+    const auto path = internal::ConcatAbstractPath(parent, "new-sub");
+    ASSERT_OK(fs_->CreateDir(path, true));
+    if (WithHierarchicalNamespace()) {
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+      arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
+    } else {
+      // There is only virtual directory without hierarchical namespace
+      // support. So the CreateDir() does nothing.
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+      arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
     }
-    return Status::Cancelled(
-        "Connection details not provided for a real flat namespace "
-        "account.");
   }
-};
 
-// How to enable this test:
-//
-// You need an Azure account. You should be able to create a free
-// account at https://azure.microsoft.com/en-gb/free/ . You should be
-// able to create a storage account through the portal Web UI.
-//
-// See also the official document how to create a storage account:
-// 
https://learn.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account
-//
-// A few suggestions on configuration:
-//
-// * Use Standard general-purpose v2 not premium
-// * Use LRS redundancy
-// * Obviously you need to enable hierarchical namespace.
-// * Set the default access tier to hot
-// * SFTP, NFS and file shares are not required.
-class AzureHierarchicalNamespaceFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() override {
-    AzureOptions options;
-    const auto account_key = 
std::getenv("AZURE_HIERARCHICAL_NAMESPACE_ACCOUNT_KEY");
-    const auto account_name = 
std::getenv("AZURE_HIERARCHICAL_NAMESPACE_ACCOUNT_NAME");
-    if (account_key && account_name) {
-      RETURN_NOT_OK(options.ConfigureAccountKeyCredentials(account_name, 
account_key));
-      return options;
+  void CreateDirRecursiveSuccessContainerAndDirectoryTest() {
+    auto data = SetUpPreexistingData();
+    const auto parent = data.RandomDirectoryPath(rng_);
+    const auto path = internal::ConcatAbstractPath(parent, "new-sub");
+    ASSERT_OK(fs_->CreateDir(path, true));
+    if (WithHierarchicalNamespace()) {
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+      arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
+      arrow::fs::AssertFileInfo(fs_.get(), data.container_name, 
FileType::Directory);
+    } else {
+      // There is only virtual directory without hierarchical namespace
+      // support. So the CreateDir() does nothing.
+      arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+      arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
+      arrow::fs::AssertFileInfo(fs_.get(), data.container_name, 
FileType::Directory);
     }
-    return Status::Cancelled(
-        "Connection details not provided for a real hierarchical namespace "
-        "account.");
   }
-};
 
-TEST_F(AzureFlatNamespaceFileSystemTest, DetectHierarchicalNamespace) {
-  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
-  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_OK_AND_EQ(false, 
hierarchical_namespace.Enabled(PreexistingContainerName()));
-}
+  void DeleteDirContentsSuccessNonexistentTest() {
+    auto data = SetUpPreexistingData();
+    const auto directory_path = data.RandomDirectoryPath(rng_);
+    ASSERT_OK(fs_->DeleteDirContents(directory_path, true));
+    arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+  }
 
-TEST_F(AzureHierarchicalNamespaceFileSystemTest, DetectHierarchicalNamespace) {
-  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
-  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_OK_AND_EQ(true, 
hierarchical_namespace.Enabled(PreexistingContainerName()));
-}
+  void DeleteDirContentsFailureNonexistentTest() {
+    auto data = SetUpPreexistingData();
+    const auto directory_path = data.RandomDirectoryPath(rng_);
+    ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
+  }
+};
 
-TEST_F(AzuriteFileSystemTest, DetectHierarchicalNamespace) {
-  auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
-  ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_OK_AND_EQ(false, 
hierarchical_namespace.Enabled(PreexistingContainerName()));
-}
+void AzureFileSystemTest::DetectHierarchicalNamespaceTest() {
+  // Check the environments are implemented and injected here correctly.
+  auto expected = WithHierarchicalNamespace();
 
-TEST_F(AzuriteFileSystemTest, 
DetectHierarchicalNamespaceFailsWithMissingContainer) {
+  auto data = SetUpPreexistingData();
   auto hierarchical_namespace = internal::HierarchicalNamespaceDetector();
   ASSERT_OK(hierarchical_namespace.Init(datalake_service_client_.get()));
-  ASSERT_NOT_OK(hierarchical_namespace.Enabled("nonexistent-container"));
+  ASSERT_OK_AND_EQ(expected, 
hierarchical_namespace.Enabled(data.container_name));
 }
 
-TEST_F(AzuriteFileSystemTest, GetFileInfoAccount) {
-  AssertFileInfo(fs_.get(), "", FileType::Directory);
-
-  // URI
-  ASSERT_RAISES(Invalid, fs_->GetFileInfo("abfs://"));
-}
-
-TEST_F(AzuriteFileSystemTest, GetFileInfoContainer) {
-  AssertFileInfo(fs_.get(), PreexistingContainerName(), FileType::Directory);
+void AzureFileSystemTest::GetFileInfoObjectTest() {
+  auto data = SetUpPreexistingData();
+  auto object_properties =
+      blob_service_client_->GetBlobContainerClient(data.container_name)
+          .GetBlobClient(data.kObjectName)
+          .GetProperties()
+          .Value;
 
-  AssertFileInfo(fs_.get(), "nonexistent-container", FileType::NotFound);
+  AssertFileInfo(fs_.get(), data.ObjectPath(), FileType::File,
+                 
std::chrono::system_clock::time_point{object_properties.LastModified},
+                 static_cast<int64_t>(object_properties.BlobSize));
 
   // URI
-  ASSERT_RAISES(Invalid, fs_->GetFileInfo("abfs://" + 
PreexistingContainerName()));
+  ASSERT_RAISES(Invalid, fs_->GetFileInfo("abfs://" + 
std::string{data.kObjectName}));
 }
 
-void AzureFileSystemTest::RunGetFileInfoObjectWithNestedStructureTest() {
+void AzureFileSystemTest::GetFileInfoObjectWithNestedStructureTest() {
+  auto data = SetUpPreexistingData();
   // Adds detailed tests to handle cases of different edge cases
   // with directory naming conventions (e.g. with and without slashes).
   constexpr auto kObjectName = 
"test-object-dir/some_other_dir/another_dir/foo";
   ASSERT_OK_AND_ASSIGN(
       auto output,
-      fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName, 
/*metadata=*/{}));
-  const std::string_view data(kLoremIpsum);
-  ASSERT_OK(output->Write(data));
+      fs_->OpenOutputStream(data.ContainerPath() + kObjectName, 
/*metadata=*/{}));
+  const std::string_view lorem_ipsum(PreexistingData::kLoremIpsum);
+  ASSERT_OK(output->Write(lorem_ipsum));
   ASSERT_OK(output->Close());
 
   // 0 is immediately after "/" lexicographically, ensure that this doesn't
   // cause unexpected issues.
-  ASSERT_OK_AND_ASSIGN(output,
-                       fs_->OpenOutputStream(
-                           PreexistingContainerPath() + 
"test-object-dir/some_other_dir0",
-                           /*metadata=*/{}));
-  ASSERT_OK(output->Write(data));
-  ASSERT_OK(output->Close());
   ASSERT_OK_AND_ASSIGN(
-      output, fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName + 
"0",
-                                    /*metadata=*/{}));
-  ASSERT_OK(output->Write(data));
+      output,
+      fs_->OpenOutputStream(data.ContainerPath() + 
"test-object-dir/some_other_dir0",
+                            /*metadata=*/{}));
+  ASSERT_OK(output->Write(lorem_ipsum));
+  ASSERT_OK(output->Close());
+  ASSERT_OK_AND_ASSIGN(output,
+                       fs_->OpenOutputStream(data.ContainerPath() + 
kObjectName + "0",
+                                             /*metadata=*/{}));
+  ASSERT_OK(output->Write(lorem_ipsum));
   ASSERT_OK(output->Close());
 
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName, 
FileType::File);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName + "/",
-                 FileType::NotFound);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-dir",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + kObjectName, 
FileType::File);
+  AssertFileInfo(fs_.get(), data.ContainerPath() + kObjectName + "/", 
FileType::NotFound);
+  AssertFileInfo(fs_.get(), data.ContainerPath() + "test-object-dir",
                  FileType::Directory);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-dir/",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + "test-object-dir/",
                  FileType::Directory);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + 
"test-object-dir/some_other_dir",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + 
"test-object-dir/some_other_dir",
                  FileType::Directory);
-  AssertFileInfo(fs_.get(),
-                 PreexistingContainerPath() + 
"test-object-dir/some_other_dir/",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + 
"test-object-dir/some_other_dir/",
                  FileType::Directory);
 
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + "test-object-di",
-                 FileType::NotFound);
-  AssertFileInfo(fs_.get(), PreexistingContainerPath() + 
"test-object-dir/some_other_di",
+  AssertFileInfo(fs_.get(), data.ContainerPath() + "test-object-di", 
FileType::NotFound);
+  AssertFileInfo(fs_.get(), data.ContainerPath() + 
"test-object-dir/some_other_di",
                  FileType::NotFound);
+
+  if (WithHierarchicalNamespace()) {
+    datalake_service_client_->GetFileSystemClient(data.container_name)
+        .GetDirectoryClient("test-empty-object-dir")
+        .Create();
+
+    AssertFileInfo(fs_.get(), data.ContainerPath() + "test-empty-object-dir",
+                   FileType::Directory);
+  }
 }
 
-TEST_F(AzuriteFileSystemTest, GetFileInfoObjectWithNestedStructure) {
-  RunGetFileInfoObjectWithNestedStructureTest();
+template <class AzureEnvClass>
+class AzureFileSystemTestImpl : public AzureFileSystemTest {
+ public:
+  using AzureFileSystemTest::AzureFileSystemTest;
+
+  Result<BaseAzureEnv*> GetAzureEnv() const final { return 
AzureEnvClass::GetInstance(); }
+};
+
+// How to enable the non-Azurite tests:

Review Comment:
   Thanks for these explanations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to