This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 70ccf33bc5 GH-38705: [C++][FS][Azure] Implement CopyFile() (#39058)
70ccf33bc5 is described below
commit 70ccf33bc528bdba71efdfa78e08afb5a0904c09
Author: Sutou Kouhei <[email protected]>
AuthorDate: Thu Dec 7 14:26:21 2023 +0900
GH-38705: [C++][FS][Azure] Implement CopyFile() (#39058)
### Rationale for this change
`CopyFile()` copies the given source to the given destination. Both of
source and destination must be blob name like other filesystem implementations.
### What changes are included in this PR?
Use `CopyFromUri()` API that should use server-side copy.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
No.
* Closes: #38705
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/filesystem/azurefs.cc | 39 ++++++++++++++++++++++-----
cpp/src/arrow/filesystem/azurefs_test.cc | 45 ++++++++++++++++++++++++++++++++
2 files changed, 77 insertions(+), 7 deletions(-)
diff --git a/cpp/src/arrow/filesystem/azurefs.cc
b/cpp/src/arrow/filesystem/azurefs.cc
index 4efb802f8b..9bd2b0ae9d 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -115,6 +115,10 @@ struct AzureLocation {
return parent;
}
+ Result<AzureLocation> join(const std::string& stem) const {
+ return FromString(internal::ConcatAbstractPath(all, stem));
+ }
+
bool has_parent() const { return !path.empty(); }
bool empty() const { return container.empty() && path.empty(); }
@@ -149,6 +153,7 @@ Status ValidateFileLocation(const AzureLocation& location) {
if (location.path.empty()) {
return NotAFile(location);
}
+ ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));
return Status::OK();
}
@@ -818,7 +823,6 @@ class AzureFileSystem::Impl {
Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const AzureLocation&
location,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
- ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));
auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
.GetBlobClient(location.path));
@@ -831,7 +835,6 @@ class AzureFileSystem::Impl {
Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
AzureFileSystem* fs) {
- ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
@@ -951,7 +954,6 @@ class AzureFileSystem::Impl {
const std::shared_ptr<const KeyValueMetadata>& metadata, const bool
truncate,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
- ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));
auto block_blob_client =
std::make_shared<Azure::Storage::Blobs::BlockBlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
@@ -971,7 +973,7 @@ class AzureFileSystem::Impl {
}
private:
- Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation&
location,
+ Status DeleteDirContentsWithoutHierarchicalNamespace(const AzureLocation&
location,
bool missing_dir_ok) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
@@ -1092,7 +1094,7 @@ class AzureFileSystem::Impl {
exception);
}
} else {
- return DeleteDirContentsWihtoutHierarchicalNamespace(location,
+ return DeleteDirContentsWithoutHierarchicalNamespace(location,
/*missing_dir_ok=*/true);
}
}
@@ -1149,9 +1151,30 @@ class AzureFileSystem::Impl {
}
return Status::OK();
} else {
- return DeleteDirContentsWihtoutHierarchicalNamespace(location,
missing_dir_ok);
+ return DeleteDirContentsWithoutHierarchicalNamespace(location,
missing_dir_ok);
}
}
+
+ Status CopyFile(const AzureLocation& src, const AzureLocation& dest) {
+ RETURN_NOT_OK(ValidateFileLocation(src));
+ RETURN_NOT_OK(ValidateFileLocation(dest));
+ if (src == dest) {
+ return Status::OK();
+ }
+ auto dest_blob_client =
blob_service_client_->GetBlobContainerClient(dest.container)
+ .GetBlobClient(dest.path);
+ auto src_url = blob_service_client_->GetBlobContainerClient(src.container)
+ .GetBlobClient(src.path)
+ .GetUrl();
+ try {
+ dest_blob_client.CopyFromUri(src_url);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "Failed to copy a blob. (" + src_url + " -> " +
dest_blob_client.GetUrl() + ")",
+ exception);
+ }
+ return Status::OK();
+ }
};
const AzureOptions& AzureFileSystem::options() const { return
impl_->options(); }
@@ -1208,7 +1231,9 @@ Status AzureFileSystem::Move(const std::string& src,
const std::string& dest) {
}
Status AzureFileSystem::CopyFile(const std::string& src, const std::string&
dest) {
- return Status::NotImplemented("The Azure FileSystem is not fully
implemented");
+ ARROW_ASSIGN_OR_RAISE(auto src_location, AzureLocation::FromString(src));
+ ARROW_ASSIGN_OR_RAISE(auto dest_location, AzureLocation::FromString(dest));
+ return impl_->CopyFile(src_location, dest_location);
}
Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc
b/cpp/src/arrow/filesystem/azurefs_test.cc
index 1828c052e7..41f1663114 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -771,6 +771,51 @@ TEST_F(AzureHierarchicalNamespaceFileSystemTest,
DeleteDirContentsFailureNonexis
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}
+TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationNonexistent) {
+ const auto destination_path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
"copy-destionation");
+ ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), destination_path));
+ ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(destination_path));
+ ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info));
+ ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
+ EXPECT_EQ(kLoremIpsum, buffer->ToString());
+}
+
+TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationSame) {
+ ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), PreexistingObjectPath()));
+ ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(PreexistingObjectPath()));
+ ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info));
+ ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
+ EXPECT_EQ(kLoremIpsum, buffer->ToString());
+}
+
+TEST_F(AzuriteFileSystemTest, CopyFileFailureDestinationTrailingSlash) {
+ ASSERT_RAISES(IOError,
+ fs_->CopyFile(PreexistingObjectPath(),
+
internal::EnsureTrailingSlash(PreexistingObjectPath())));
+}
+
+TEST_F(AzuriteFileSystemTest, CopyFileFailureSourceNonexistent) {
+ const auto destination_path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
"copy-destionation");
+ ASSERT_RAISES(IOError, fs_->CopyFile(NotFoundObjectPath(),
destination_path));
+}
+
+TEST_F(AzuriteFileSystemTest, CopyFileFailureDestinationParentNonexistent) {
+ const auto destination_path =
+ internal::ConcatAbstractPath(RandomContainerName(), "copy-destionation");
+ ASSERT_RAISES(IOError, fs_->CopyFile(PreexistingObjectPath(),
destination_path));
+}
+
+TEST_F(AzuriteFileSystemTest, CopyFileUri) {
+ const auto destination_path =
+ internal::ConcatAbstractPath(PreexistingContainerName(),
"copy-destionation");
+ ASSERT_RAISES(Invalid,
+ fs_->CopyFile("abfs://" + PreexistingObjectPath(),
destination_path));
+ ASSERT_RAISES(Invalid,
+ fs_->CopyFile(PreexistingObjectPath(), "abfs://" +
destination_path));
+}
+
TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));