This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7da78951e8 GH-38700: [C++][FS][Azure] Implement `DeleteDir()` (#38793)
7da78951e8 is described below

commit 7da78951e879fc64bdb8d7763d78cc31e6dc13c3
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sat Nov 25 06:58:39 2023 +0900

    GH-38700: [C++][FS][Azure] Implement `DeleteDir()` (#38793)
    
    ### Rationale for this change
    
    `DeleteDir()` deletes the given directory recursively like other filesystem 
implementations.
    
    ### What changes are included in this PR?
    
    * Container can be deleted with/without hierarchical namespace support.
    * Directory can be deleted with hierarchical namespace support.
    * Directory can't be deleted without hierarchical namespace support. But 
blobs under the given path can be deleted. So these blobs are deleted and the 
given virtual directory is also deleted.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    * Closes: #38700
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/arrow/filesystem/azurefs.cc      | 116 +++++++++++++++++++++-
 cpp/src/arrow/filesystem/azurefs_test.cc | 164 ++++++++++++++++++++++++++++++-
 2 files changed, 275 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/filesystem/azurefs.cc 
b/cpp/src/arrow/filesystem/azurefs.cc
index 2c3d81ca24..4dde275da1 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -969,6 +969,119 @@ class AzureFileSystem::Impl {
     RETURN_NOT_OK(stream->Init());
     return stream;
   }
+
+  Status DeleteDir(const AzureLocation& location) {
+    if (location.container.empty()) {
+      return Status::Invalid("Cannot delete an empty container");
+    }
+
+    if (location.path.empty()) {
+      auto container_client =
+          blob_service_client_->GetBlobContainerClient(location.container);
+      try {
+        auto response = container_client.Delete();
+        if (response.Value.Deleted) {
+          return Status::OK();
+        } else {
+          return StatusFromErrorResponse(
+              container_client.GetUrl(), response.RawResponse.get(),
+              "Failed to delete a container: " + location.container);
+        }
+      } catch (const Azure::Storage::StorageException& exception) {
+        return internal::ExceptionToStatus(
+            "Failed to delete a container: " + location.container + ": " +
+                container_client.GetUrl(),
+            exception);
+      }
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
+                          hierarchical_namespace_.Enabled(location.container));
+    if (hierarchical_namespace_enabled) {
+      auto directory_client =
+          datalake_service_client_->GetFileSystemClient(location.container)
+              .GetDirectoryClient(location.path);
+      try {
+        auto response = directory_client.DeleteRecursive();
+        if (response.Value.Deleted) {
+          return Status::OK();
+        } else {
+          return StatusFromErrorResponse(
+              directory_client.GetUrl(), response.RawResponse.get(),
+              "Failed to delete a directory: " + location.path);
+        }
+      } catch (const Azure::Storage::StorageException& exception) {
+        return internal::ExceptionToStatus(
+            "Failed to delete a directory: " + location.path + ": " +
+                directory_client.GetUrl(),
+            exception);
+      }
+    } else {
+      auto container_client =
+          blob_service_client_->GetBlobContainerClient(location.container);
+      Azure::Storage::Blobs::ListBlobsOptions options;
+      options.Prefix = internal::EnsureTrailingSlash(location.path);
+      // 
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
+      //
+      // Only supports up to 256 subrequests in a single batch. The
+      // size of the body for a batch request can't exceed 4 MB.
+      const int32_t kNumMaxRequestsInBatch = 256;
+      options.PageSizeHint = kNumMaxRequestsInBatch;
+      try {
+        auto list_response = container_client.ListBlobs(options);
+        while (list_response.HasPage() && !list_response.Blobs.empty()) {
+          auto batch = container_client.CreateBatch();
+          std::vector<Azure::Storage::DeferredResponse<
+              Azure::Storage::Blobs::Models::DeleteBlobResult>>
+              deferred_responses;
+          for (const auto& blob_item : list_response.Blobs) {
+            deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
+          }
+          try {
+            container_client.SubmitBatch(batch);
+          } catch (const Azure::Storage::StorageException& exception) {
+            return internal::ExceptionToStatus(
+                "Failed to delete blobs in a directory: " + location.path + ": 
" +
+                    container_client.GetUrl(),
+                exception);
+          }
+          std::vector<std::string> failed_blob_names;
+          for (size_t i = 0; i < deferred_responses.size(); ++i) {
+            const auto& deferred_response = deferred_responses[i];
+            bool success = true;
+            try {
+              auto delete_result = deferred_response.GetResponse();
+              success = delete_result.Value.Deleted;
+            } catch (const Azure::Storage::StorageException& exception) {
+              success = false;
+            }
+            if (!success) {
+              const auto& blob_item = list_response.Blobs[i];
+              failed_blob_names.push_back(blob_item.Name);
+            }
+          }
+          if (!failed_blob_names.empty()) {
+            if (failed_blob_names.size() == 1) {
+              return Status::IOError("Failed to delete a blob: ", 
failed_blob_names[0],
+                                     ": " + container_client.GetUrl());
+            } else {
+              return Status::IOError(
+                  "Failed to delete blobs: [",
+                  arrow::internal::JoinStrings(failed_blob_names, ", "),
+                  "]: " + container_client.GetUrl());
+            }
+          }
+          list_response.MoveToNextPage();
+        }
+      } catch (const Azure::Storage::StorageException& exception) {
+        return internal::ExceptionToStatus(
+            "Failed to list blobs in a directory: " + location.path + ": " +
+                container_client.GetUrl(),
+            exception);
+      }
+      return Status::OK();
+    }
+  }
 };
 
 const AzureOptions& AzureFileSystem::options() const { return 
impl_->options(); }
@@ -1003,7 +1116,8 @@ Status AzureFileSystem::CreateDir(const std::string& 
path, bool recursive) {
 }
 
 Status AzureFileSystem::DeleteDir(const std::string& path) {
-  return Status::NotImplemented("The Azure FileSystem is not fully 
implemented");
+  ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
+  return impl_->DeleteDir(location);
 }
 
 Status AzureFileSystem::DeleteDirContents(const std::string& path, bool 
missing_dir_ok) {
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc 
b/cpp/src/arrow/filesystem/azurefs_test.cc
index e9b9a6f34b..7c86385126 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -56,6 +56,7 @@
 #include "arrow/testing/util.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/string.h"
 #include "arrow/util/value_parsing.h"
 
@@ -92,9 +93,15 @@ class AzuriteEnv : public ::testing::Environment {
       return;
     }
     auto temp_dir_ = *TemporaryDir::Make("azurefs-test-");
-    server_process_ = bp::child(boost::this_process::environment(), exe_path, 
"--silent",
-                                "--location", temp_dir_->path().ToString(), 
"--debug",
-                                temp_dir_->path().ToString() + "/debug.log");
+    auto debug_log_path_result = temp_dir_->path().Join("debug.log");
+    if (!debug_log_path_result.ok()) {
+      status_ = debug_log_path_result.status();
+      return;
+    }
+    debug_log_path_ = *debug_log_path_result;
+    server_process_ =
+        bp::child(boost::this_process::environment(), exe_path, "--silent", 
"--location",
+                  temp_dir_->path().ToString(), "--debug", 
debug_log_path_.ToString());
     if (!(server_process_.valid() && server_process_.running())) {
       auto error = "Could not start Azurite emulator.";
       server_process_.terminate();
@@ -110,6 +117,44 @@ class AzuriteEnv : public ::testing::Environment {
     server_process_.wait();
   }
 
+  Result<int64_t> GetDebugLogSize() {
+    ARROW_ASSIGN_OR_RAISE(auto exists, 
arrow::internal::FileExists(debug_log_path_));
+    if (!exists) {
+      return 0;
+    }
+    ARROW_ASSIGN_OR_RAISE(auto file_descriptor,
+                          arrow::internal::FileOpenReadable(debug_log_path_));
+    ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), 0, 
SEEK_END));
+    return arrow::internal::FileTell(file_descriptor.fd());
+  }
+
+  Status DumpDebugLog(int64_t position = 0) {
+    ARROW_ASSIGN_OR_RAISE(auto exists, 
arrow::internal::FileExists(debug_log_path_));
+    if (!exists) {
+      return Status::OK();
+    }
+    ARROW_ASSIGN_OR_RAISE(auto file_descriptor,
+                          arrow::internal::FileOpenReadable(debug_log_path_));
+    if (position > 0) {
+      ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), 
position));
+    }
+    std::vector<uint8_t> buffer;
+    const int64_t buffer_size = 4096;
+    buffer.reserve(buffer_size);
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto n_read_bytes,
+          arrow::internal::FileRead(file_descriptor.fd(), buffer.data(), 
buffer_size));
+      if (n_read_bytes <= 0) {
+        break;
+      }
+      std::cerr << std::string_view(reinterpret_cast<const 
char*>(buffer.data()),
+                                    n_read_bytes);
+    }
+    std::cerr << std::endl;
+    return Status::OK();
+  }
+
   const std::string& account_name() const { return account_name_; }
   const std::string& account_key() const { return account_key_; }
   const Status status() const { return status_; }
@@ -120,6 +165,7 @@ class AzuriteEnv : public ::testing::Environment {
   bp::child server_process_;
   Status status_;
   std::unique_ptr<TemporaryDir> temp_dir_;
+  arrow::internal::PlatformFilename debug_log_path_;
 };
 
 auto* azurite_env = ::testing::AddGlobalTestEnvironment(new AzuriteEnv);
@@ -244,15 +290,28 @@ class AzureFileSystemTest : public ::testing::Test {
 };
 
 class AzuriteFileSystemTest : public AzureFileSystemTest {
-  Result<AzureOptions> MakeOptions() {
+  Result<AzureOptions> MakeOptions() override {
     EXPECT_THAT(GetAzuriteEnv(), NotNull());
     ARROW_EXPECT_OK(GetAzuriteEnv()->status());
+    ARROW_ASSIGN_OR_RAISE(debug_log_start_, 
GetAzuriteEnv()->GetDebugLogSize());
     AzureOptions options;
     options.backend = AzureBackend::Azurite;
     ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
         GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
     return options;
   }
+
+  void TearDown() override {
+    AzureFileSystemTest::TearDown();
+    if (HasFailure()) {
+      // XXX: This may not include all logs in the target test because
+      // Azurite doesn't flush debug logs immediately... You may want
+      // to check the log manually...
+      ARROW_IGNORE_EXPR(GetAzuriteEnv()->DumpDebugLog(debug_log_start_));
+    }
+  }
+
+  int64_t debug_log_start_ = 0;
 };
 
 class AzureFlatNamespaceFileSystemTest : public AzureFileSystemTest {
@@ -510,6 +569,103 @@ TEST_F(AzuriteFileSystemTest, CreateDirUri) {
   ASSERT_RAISES(Invalid, fs_->CreateDir("abfs://" + RandomContainerName(), 
true));
 }
 
+TEST_F(AzuriteFileSystemTest, DeleteDirSuccessContainer) {
+  const auto container_name = RandomContainerName();
+  ASSERT_OK(fs_->CreateDir(container_name));
+  arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::Directory);
+  ASSERT_OK(fs_->DeleteDir(container_name));
+  arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::NotFound);
+}
+
+TEST_F(AzuriteFileSystemTest, DeleteDirSuccessEmpty) {
+  const auto directory_path =
+      internal::ConcatAbstractPath(PreexistingContainerName(), 
RandomDirectoryName());
+  // There is only virtual directory without hierarchical namespace
+  // support. So the CreateDir() and DeleteDir() do nothing.
+  ASSERT_OK(fs_->CreateDir(directory_path));
+  arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+  ASSERT_OK(fs_->DeleteDir(directory_path));
+  arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+}
+
+TEST_F(AzuriteFileSystemTest, DeleteDirSuccessNonexistent) {
+  const auto directory_path =
+      internal::ConcatAbstractPath(PreexistingContainerName(), 
RandomDirectoryName());
+  // There is only virtual directory without hierarchical namespace
+  // support. So the DeleteDir() for nonexistent directory does nothing.
+  ASSERT_OK(fs_->DeleteDir(directory_path));
+  arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+}
+
+TEST_F(AzuriteFileSystemTest, DeleteDirSuccessHaveBlobs) {
+#ifdef __APPLE__
+  GTEST_SKIP() << "This test fails by an Azurite problem: "
+                  "https://github.com/Azure/Azurite/pull/2302";;
+#endif
+  const auto directory_path =
+      internal::ConcatAbstractPath(PreexistingContainerName(), 
RandomDirectoryName());
+  // We must use 257 or more blobs here to test pagination of ListBlobs().
+  // Because we can't add 257 or more delete blob requests to one 
SubmitBatch().
+  int64_t n_blobs = 257;
+  for (int64_t i = 0; i < n_blobs; ++i) {
+    const auto blob_path =
+        internal::ConcatAbstractPath(directory_path, std::to_string(i) + 
".txt");
+    ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
+    ASSERT_OK(output->Write(std::string_view(std::to_string(i))));
+    ASSERT_OK(output->Close());
+    arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
+  }
+  ASSERT_OK(fs_->DeleteDir(directory_path));
+  for (int64_t i = 0; i < n_blobs; ++i) {
+    const auto blob_path =
+        internal::ConcatAbstractPath(directory_path, std::to_string(i) + 
".txt");
+    arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
+  }
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessEmpty) {
+  const auto directory_path =
+      internal::ConcatAbstractPath(PreexistingContainerName(), 
RandomDirectoryName());
+  ASSERT_OK(fs_->CreateDir(directory_path, true));
+  arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
+  ASSERT_OK(fs_->DeleteDir(directory_path));
+  arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirFailureNonexistent) {
+  const auto path =
+      internal::ConcatAbstractPath(PreexistingContainerName(), 
RandomDirectoryName());
+  ASSERT_RAISES(IOError, fs_->DeleteDir(path));
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveBlob) {
+  const auto directory_path =
+      internal::ConcatAbstractPath(PreexistingContainerName(), 
RandomDirectoryName());
+  const auto blob_path = internal::ConcatAbstractPath(directory_path, 
"hello.txt");
+  ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
+  ASSERT_OK(output->Write(std::string_view("hello")));
+  ASSERT_OK(output->Close());
+  arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
+  ASSERT_OK(fs_->DeleteDir(directory_path));
+  arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest, 
DeleteDirSuccessHaveDirectory) {
+  const auto parent =
+      internal::ConcatAbstractPath(PreexistingContainerName(), 
RandomDirectoryName());
+  const auto path = internal::ConcatAbstractPath(parent, "new-sub");
+  ASSERT_OK(fs_->CreateDir(path, true));
+  arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+  arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
+  ASSERT_OK(fs_->DeleteDir(parent));
+  arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+  arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
+}
+
+TEST_F(AzuriteFileSystemTest, DeleteDirUri) {
+  ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + 
PreexistingContainerPath()));
+}
+
 TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
   std::shared_ptr<io::InputStream> stream;
   ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));

Reply via email to