This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 7da78951e8 GH-38700: [C++][FS][Azure] Implement `DeleteDir()` (#38793)
7da78951e8 is described below
commit 7da78951e879fc64bdb8d7763d78cc31e6dc13c3
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sat Nov 25 06:58:39 2023 +0900
GH-38700: [C++][FS][Azure] Implement `DeleteDir()` (#38793)
### Rationale for this change
`DeleteDir()` deletes the given directory recursively like other filesystem
implementations.
### What changes are included in this PR?
* Container can be deleted with/without hierarchical namespace support.
* Directory can be deleted with hierarchical namespace support.
* Directory can't be deleted without hierarchical namespace support. But
blobs under the given path can be deleted. So these blobs are deleted and the
given virtual directory is also deleted.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
* Closes: #38700
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/filesystem/azurefs.cc | 116 +++++++++++++++++++++-
cpp/src/arrow/filesystem/azurefs_test.cc | 164 ++++++++++++++++++++++++++++++-
2 files changed, 275 insertions(+), 5 deletions(-)
diff --git a/cpp/src/arrow/filesystem/azurefs.cc
b/cpp/src/arrow/filesystem/azurefs.cc
index 2c3d81ca24..4dde275da1 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -969,6 +969,119 @@ class AzureFileSystem::Impl {
RETURN_NOT_OK(stream->Init());
return stream;
}
+
+ Status DeleteDir(const AzureLocation& location) {
+ if (location.container.empty()) {
+ return Status::Invalid("Cannot delete an empty container");
+ }
+
+ if (location.path.empty()) {
+ auto container_client =
+ blob_service_client_->GetBlobContainerClient(location.container);
+ try {
+ auto response = container_client.Delete();
+ if (response.Value.Deleted) {
+ return Status::OK();
+ } else {
+ return StatusFromErrorResponse(
+ container_client.GetUrl(), response.RawResponse.get(),
+ "Failed to delete a container: " + location.container);
+ }
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "Failed to delete a container: " + location.container + ": " +
+ container_client.GetUrl(),
+ exception);
+ }
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
+ hierarchical_namespace_.Enabled(location.container));
+ if (hierarchical_namespace_enabled) {
+ auto directory_client =
+ datalake_service_client_->GetFileSystemClient(location.container)
+ .GetDirectoryClient(location.path);
+ try {
+ auto response = directory_client.DeleteRecursive();
+ if (response.Value.Deleted) {
+ return Status::OK();
+ } else {
+ return StatusFromErrorResponse(
+ directory_client.GetUrl(), response.RawResponse.get(),
+ "Failed to delete a directory: " + location.path);
+ }
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "Failed to delete a directory: " + location.path + ": " +
+ directory_client.GetUrl(),
+ exception);
+ }
+ } else {
+ auto container_client =
+ blob_service_client_->GetBlobContainerClient(location.container);
+ Azure::Storage::Blobs::ListBlobsOptions options;
+ options.Prefix = internal::EnsureTrailingSlash(location.path);
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
+ //
+ // Only supports up to 256 subrequests in a single batch. The
+ // size of the body for a batch request can't exceed 4 MB.
+ const int32_t kNumMaxRequestsInBatch = 256;
+ options.PageSizeHint = kNumMaxRequestsInBatch;
+ try {
+ auto list_response = container_client.ListBlobs(options);
+ while (list_response.HasPage() && !list_response.Blobs.empty()) {
+ auto batch = container_client.CreateBatch();
+ std::vector<Azure::Storage::DeferredResponse<
+ Azure::Storage::Blobs::Models::DeleteBlobResult>>
+ deferred_responses;
+ for (const auto& blob_item : list_response.Blobs) {
+ deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
+ }
+ try {
+ container_client.SubmitBatch(batch);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "Failed to delete blobs in a directory: " + location.path + ":
" +
+ container_client.GetUrl(),
+ exception);
+ }
+ std::vector<std::string> failed_blob_names;
+ for (size_t i = 0; i < deferred_responses.size(); ++i) {
+ const auto& deferred_response = deferred_responses[i];
+ bool success = true;
+ try {
+ auto delete_result = deferred_response.GetResponse();
+ success = delete_result.Value.Deleted;
+ } catch (const Azure::Storage::StorageException& exception) {
+ success = false;
+ }
+ if (!success) {
+ const auto& blob_item = list_response.Blobs[i];
+ failed_blob_names.push_back(blob_item.Name);
+ }
+ }
+ if (!failed_blob_names.empty()) {
+ if (failed_blob_names.size() == 1) {
+ return Status::IOError("Failed to delete a blob: ",
failed_blob_names[0],
+ ": " + container_client.GetUrl());
+ } else {
+ return Status::IOError(
+ "Failed to delete blobs: [",
+ arrow::internal::JoinStrings(failed_blob_names, ", "),
+ "]: " + container_client.GetUrl());
+ }
+ }
+ list_response.MoveToNextPage();
+ }
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "Failed to list blobs in a directory: " + location.path + ": " +
+ container_client.GetUrl(),
+ exception);
+ }
+ return Status::OK();
+ }
+ }
};
const AzureOptions& AzureFileSystem::options() const { return
impl_->options(); }
@@ -1003,7 +1116,8 @@ Status AzureFileSystem::CreateDir(const std::string&
path, bool recursive) {
}
Status AzureFileSystem::DeleteDir(const std::string& path) {
- return Status::NotImplemented("The Azure FileSystem is not fully
implemented");
+ ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
+ return impl_->DeleteDir(location);
}
Status AzureFileSystem::DeleteDirContents(const std::string& path, bool
missing_dir_ok) {
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc
b/cpp/src/arrow/filesystem/azurefs_test.cc
index e9b9a6f34b..7c86385126 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -56,6 +56,7 @@
#include "arrow/testing/util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"
@@ -92,9 +93,15 @@ class AzuriteEnv : public ::testing::Environment {
return;
}
auto temp_dir_ = *TemporaryDir::Make("azurefs-test-");
- server_process_ = bp::child(boost::this_process::environment(), exe_path,
"--silent",
- "--location", temp_dir_->path().ToString(),
"--debug",
- temp_dir_->path().ToString() + "/debug.log");
+ auto debug_log_path_result = temp_dir_->path().Join("debug.log");
+ if (!debug_log_path_result.ok()) {
+ status_ = debug_log_path_result.status();
+ return;
+ }
+ debug_log_path_ = *debug_log_path_result;
+ server_process_ =
+ bp::child(boost::this_process::environment(), exe_path, "--silent",
"--location",
+ temp_dir_->path().ToString(), "--debug",
debug_log_path_.ToString());
if (!(server_process_.valid() && server_process_.running())) {
auto error = "Could not start Azurite emulator.";
server_process_.terminate();
@@ -110,6 +117,44 @@ class AzuriteEnv : public ::testing::Environment {
server_process_.wait();
}
+ Result<int64_t> GetDebugLogSize() {
+ ARROW_ASSIGN_OR_RAISE(auto exists,
arrow::internal::FileExists(debug_log_path_));
+ if (!exists) {
+ return 0;
+ }
+ ARROW_ASSIGN_OR_RAISE(auto file_descriptor,
+ arrow::internal::FileOpenReadable(debug_log_path_));
+ ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(), 0,
SEEK_END));
+ return arrow::internal::FileTell(file_descriptor.fd());
+ }
+
+ Status DumpDebugLog(int64_t position = 0) {
+ ARROW_ASSIGN_OR_RAISE(auto exists,
arrow::internal::FileExists(debug_log_path_));
+ if (!exists) {
+ return Status::OK();
+ }
+ ARROW_ASSIGN_OR_RAISE(auto file_descriptor,
+ arrow::internal::FileOpenReadable(debug_log_path_));
+ if (position > 0) {
+ ARROW_RETURN_NOT_OK(arrow::internal::FileSeek(file_descriptor.fd(),
position));
+ }
+ std::vector<uint8_t> buffer;
+ const int64_t buffer_size = 4096;
+ buffer.reserve(buffer_size);
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto n_read_bytes,
+ arrow::internal::FileRead(file_descriptor.fd(), buffer.data(),
buffer_size));
+ if (n_read_bytes <= 0) {
+ break;
+ }
+ std::cerr << std::string_view(reinterpret_cast<const
char*>(buffer.data()),
+ n_read_bytes);
+ }
+ std::cerr << std::endl;
+ return Status::OK();
+ }
+
const std::string& account_name() const { return account_name_; }
const std::string& account_key() const { return account_key_; }
const Status status() const { return status_; }
@@ -120,6 +165,7 @@ class AzuriteEnv : public ::testing::Environment {
bp::child server_process_;
Status status_;
std::unique_ptr<TemporaryDir> temp_dir_;
+ arrow::internal::PlatformFilename debug_log_path_;
};
auto* azurite_env = ::testing::AddGlobalTestEnvironment(new AzuriteEnv);
@@ -244,15 +290,28 @@ class AzureFileSystemTest : public ::testing::Test {
};
class AzuriteFileSystemTest : public AzureFileSystemTest {
- Result<AzureOptions> MakeOptions() {
+ Result<AzureOptions> MakeOptions() override {
EXPECT_THAT(GetAzuriteEnv(), NotNull());
ARROW_EXPECT_OK(GetAzuriteEnv()->status());
+ ARROW_ASSIGN_OR_RAISE(debug_log_start_,
GetAzuriteEnv()->GetDebugLogSize());
AzureOptions options;
options.backend = AzureBackend::Azurite;
ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(
GetAzuriteEnv()->account_name(), GetAzuriteEnv()->account_key()));
return options;
}
+
+ void TearDown() override {
+ AzureFileSystemTest::TearDown();
+ if (HasFailure()) {
+ // XXX: This may not include all logs in the target test because
+ // Azurite doesn't flush debug logs immediately... You may want
+ // to check the log manually...
+ ARROW_IGNORE_EXPR(GetAzuriteEnv()->DumpDebugLog(debug_log_start_));
+ }
+ }
+
+ int64_t debug_log_start_ = 0;
};
class AzureFlatNamespaceFileSystemTest : public AzureFileSystemTest {
@@ -510,6 +569,103 @@ TEST_F(AzuriteFileSystemTest, CreateDirUri) {
ASSERT_RAISES(Invalid, fs_->CreateDir("abfs://" + RandomContainerName(),
true));
}
+TEST_F(AzuriteFileSystemTest, DeleteDirSuccessContainer) {
+ const auto container_name = RandomContainerName();
+ ASSERT_OK(fs_->CreateDir(container_name));
+ arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::Directory);
+ ASSERT_OK(fs_->DeleteDir(container_name));
+ arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::NotFound);
+}
+
+TEST_F(AzuriteFileSystemTest, DeleteDirSuccessEmpty) {
+ const auto directory_path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
RandomDirectoryName());
+ // There is only virtual directory without hierarchical namespace
+ // support. So the CreateDir() and DeleteDir() do nothing.
+ ASSERT_OK(fs_->CreateDir(directory_path));
+ arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+ ASSERT_OK(fs_->DeleteDir(directory_path));
+ arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+}
+
+TEST_F(AzuriteFileSystemTest, DeleteDirSuccessNonexistent) {
+ const auto directory_path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
RandomDirectoryName());
+ // There is only virtual directory without hierarchical namespace
+ // support. So the DeleteDir() for nonexistent directory does nothing.
+ ASSERT_OK(fs_->DeleteDir(directory_path));
+ arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+}
+
+TEST_F(AzuriteFileSystemTest, DeleteDirSuccessHaveBlobs) {
+#ifdef __APPLE__
+ GTEST_SKIP() << "This test fails by an Azurite problem: "
+ "https://github.com/Azure/Azurite/pull/2302";
+#endif
+ const auto directory_path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
RandomDirectoryName());
+ // We must use 257 or more blobs here to test pagination of ListBlobs().
+ // Because we can't add 257 or more delete blob requests to one
SubmitBatch().
+ int64_t n_blobs = 257;
+ for (int64_t i = 0; i < n_blobs; ++i) {
+ const auto blob_path =
+ internal::ConcatAbstractPath(directory_path, std::to_string(i) +
".txt");
+ ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
+ ASSERT_OK(output->Write(std::string_view(std::to_string(i))));
+ ASSERT_OK(output->Close());
+ arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
+ }
+ ASSERT_OK(fs_->DeleteDir(directory_path));
+ for (int64_t i = 0; i < n_blobs; ++i) {
+ const auto blob_path =
+ internal::ConcatAbstractPath(directory_path, std::to_string(i) +
".txt");
+ arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
+ }
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessEmpty) {
+ const auto directory_path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
RandomDirectoryName());
+ ASSERT_OK(fs_->CreateDir(directory_path, true));
+ arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
+ ASSERT_OK(fs_->DeleteDir(directory_path));
+ arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirFailureNonexistent) {
+ const auto path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
RandomDirectoryName());
+ ASSERT_RAISES(IOError, fs_->DeleteDir(path));
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveBlob) {
+ const auto directory_path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
RandomDirectoryName());
+ const auto blob_path = internal::ConcatAbstractPath(directory_path,
"hello.txt");
+ ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
+ ASSERT_OK(output->Write(std::string_view("hello")));
+ ASSERT_OK(output->Close());
+ arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
+ ASSERT_OK(fs_->DeleteDir(directory_path));
+ arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
+}
+
+TEST_F(AzureHierarchicalNamespaceFileSystemTest,
DeleteDirSuccessHaveDirectory) {
+ const auto parent =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
RandomDirectoryName());
+ const auto path = internal::ConcatAbstractPath(parent, "new-sub");
+ ASSERT_OK(fs_->CreateDir(path, true));
+ arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
+ arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
+ ASSERT_OK(fs_->DeleteDir(parent));
+ arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
+ arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
+}
+
+TEST_F(AzuriteFileSystemTest, DeleteDirUri) {
+ ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" +
PreexistingContainerPath()));
+}
+
TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));