felipecrv commented on code in PR #39904:
URL: https://github.com/apache/arrow/pull/39904#discussion_r1478775476
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1690,17 +1922,413 @@ class AzureFileSystem::Impl {
}
}
+ /// \brief Create a BlobLeaseClient and acquire a lease on the container.
+ ///
+ /// \param allow_missing_container if true, a nullptr may be returned when
the container
+ /// doesn't exist, otherwise a PathNotFound(location) error is produced
right away
+ /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+ /// optional (nullptr denotes container not found)
+ Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease(
+ const AzureLocation& location, std::chrono::seconds lease_duration,
+ bool allow_missing_container = false, bool retry_allowed = true) {
+ DCHECK(!location.container.empty());
+ auto container_client = GetBlobContainerClient(location.container);
+ auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+ auto container_url = container_client.GetUrl();
+ auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(
+ std::move(container_client), std::move(lease_id));
+ try {
+ [[maybe_unused]] auto result = lease_client->Acquire(lease_duration);
+ DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+ } catch (const Storage::StorageException& exception) {
+ if (IsContainerNotFound(exception)) {
+ if (allow_missing_container) {
+ return nullptr;
+ }
+ return PathNotFound(location);
+ } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+ exception.ErrorCode == "LeaseAlreadyPresent") {
+ if (retry_allowed) {
+ LeaseGuard::WaitUntilLatestKnownExpiryTime();
+ return AcquireContainerLease(location, lease_duration,
allow_missing_container,
+ /*retry_allowed=*/false);
+ }
+ }
+ return ExceptionToStatus(exception, "Failed to acquire a lease on
container '",
+ location.container, "': ", container_url);
+ }
+ return lease_client;
+ }
+
+ /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or
+ /// directory if Hierarchical Namespace is supported).
+ ///
+ /// \param allow_missing if true, a nullptr may be returned when the blob
+ /// doesn't exist, otherwise a PathNotFound(location) error is produced
right away
+ /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+ /// optional (nullptr denotes blob not found)
+ Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease(
+ const AzureLocation& location, std::chrono::seconds lease_duration,
+ bool allow_missing = false, bool retry_allowed = true) {
+ DCHECK(!location.container.empty() && !location.path.empty());
+ auto path = std::string{internal::RemoveTrailingSlash(location.path)};
+ auto blob_client = GetBlobClient(location.container, std::move(path));
+ auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+ auto blob_url = blob_client.GetUrl();
+ auto lease_client =
std::make_unique<Blobs::BlobLeaseClient>(std::move(blob_client),
+
std::move(lease_id));
+ try {
+ [[maybe_unused]] auto result = lease_client->Acquire(lease_duration);
+ DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+ } catch (const Storage::StorageException& exception) {
+ if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
+ if (allow_missing) {
+ return nullptr;
+ }
+ return PathNotFound(location);
+ } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+ exception.ErrorCode == "LeaseAlreadyPresent") {
+ if (retry_allowed) {
+ LeaseGuard::WaitUntilLatestKnownExpiryTime();
+ return AcquireBlobLease(location, lease_duration, allow_missing,
+ /*retry_allowed=*/false);
+ }
+ }
+ return ExceptionToStatus(exception, "Failed to acquire a lease on file
'",
+ location.all, "': ", blob_url);
+ }
+ return lease_client;
+ }
+
+ static constexpr auto kLeaseDuration = std::chrono::seconds{15};
+ static constexpr auto kTimeNeededForContainerDeletion =
std::chrono::seconds{3};
+ static constexpr auto kTimeNeededForContainerRename =
std::chrono::seconds{3};
+ static constexpr auto kTimeNeededForFileOrDirectoryRename =
std::chrono::seconds{3};
+ static constexpr auto kTimeNeededForEmptyDirectoryDeletion =
std::chrono::seconds{3};
+
+ /// The conditions for a successful container rename are derived from the
+ /// conditions for a successful `Move("/$src.container",
"/$dest.container")`.
+ /// The numbers here match the list in `Move`.
+ ///
+ /// 1. `src.container` must exist.
+ /// 2. If `src.container` and `dest.container` are the same, do nothing and
+ /// return OK.
+ /// 3. N/A.
+ /// 4. N/A.
+ /// 5. `dest.container` doesn't exist or is empty.
+ /// NOTE: one exception related to container Move is that when the
+ /// destination is empty we also require the source container to be empty,
+ /// because the only way to perform the "Move" is by deleting the source
+ /// instead of deleting the destination and renaming the source.
+ Status RenameContainer(const AzureLocation& src, const AzureLocation& dest) {
+ DCHECK(!src.container.empty() && src.path.empty());
+ DCHECK(!dest.container.empty() && dest.path.empty());
+ auto src_container_client = GetBlobContainerClient(src.container);
+
+ // If src and dest are the same, we only have to check src exists.
+ if (src.container == dest.container) {
+ ARROW_ASSIGN_OR_RAISE(auto info,
+ GetContainerPropsAsFileInfo(src,
src_container_client));
+ if (info.type() == FileType::NotFound) {
+ return PathNotFound(src);
+ }
+ DCHECK(info.type() == FileType::Directory);
+ return Status::OK();
+ }
+
+ // Acquire a lease on the source container because (1) we need the lease
+ // before rename and (2) it works as a way of checking the container
exists.
+ ARROW_ASSIGN_OR_RAISE(auto src_lease_client,
+ AcquireContainerLease(src, kLeaseDuration));
+ LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration};
+ // Check dest.container doesn't exist or is empty.
+ auto dest_container_client = GetBlobContainerClient(dest.container);
+ std::optional<LeaseGuard> dest_lease_guard;
+ bool dest_exists = false;
+ bool dest_is_empty = false;
+ ARROW_ASSIGN_OR_RAISE(
+ auto dest_lease_client,
+ AcquireContainerLease(dest, kLeaseDuration,
/*allow_missing_container*/ true));
+ if (dest_lease_client) {
+ dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration);
+ dest_exists = true;
+ // Emptiness check after successful acquisition of the lease.
+ Blobs::ListBlobsOptions list_blobs_options;
+ list_blobs_options.PageSizeHint = 1;
+ try {
+ auto dest_list_response =
dest_container_client.ListBlobs(list_blobs_options);
+ dest_is_empty = dest_list_response.Blobs.empty();
+ if (!dest_is_empty) {
+ return NotEmpty(dest);
+ }
+ } catch (const Storage::StorageException& exception) {
+ return ExceptionToStatus(exception, "Failed to check that '",
dest.container,
+ "' is empty: ",
dest_container_client.GetUrl());
+ }
+ } else {
Review Comment:
Yes. I'm removing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]