felipecrv commented on code in PR #39904:
URL: https://github.com/apache/arrow/pull/39904#discussion_r1483796207
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1690,17 +1924,438 @@ class AzureFileSystem::Impl {
}
}
+ private:
+ /// \brief Create a BlobLeaseClient and acquire a lease on the container.
+ ///
+ /// \param allow_missing_container if true, a nullptr may be returned when
the container
+ /// doesn't exist, otherwise a PathNotFound(location) error is produced
right away
+ /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+ /// optional (nullptr denotes container not found)
+ Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease(
+ const AzureLocation& location, std::chrono::seconds lease_duration,
+ bool allow_missing_container = false, bool retry_allowed = true) {
+ DCHECK(!location.container.empty());
+ auto container_client = GetBlobContainerClient(location.container);
+ auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+ auto container_url = container_client.GetUrl();
+ auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(
+ std::move(container_client), std::move(lease_id));
+ try {
+ [[maybe_unused]] auto result = lease_client->Acquire(lease_duration);
+ DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+ } catch (const Storage::StorageException& exception) {
+ if (IsContainerNotFound(exception)) {
+ if (allow_missing_container) {
+ return nullptr;
+ }
+ return PathNotFound(location);
+ } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+ exception.ErrorCode == "LeaseAlreadyPresent") {
+ if (retry_allowed) {
+ LeaseGuard::WaitUntilLatestKnownExpiryTime();
+ return AcquireContainerLease(location, lease_duration,
allow_missing_container,
+ /*retry_allowed=*/false);
+ }
+ }
+ return ExceptionToStatus(exception, "Failed to acquire a lease on
container '",
+ location.container, "': ", container_url);
+ }
+ return lease_client;
+ }
+
+ /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or
+ /// directory if Hierarchical Namespace is supported).
+ ///
+ /// \param allow_missing if true, a nullptr may be returned when the blob
+ /// doesn't exist, otherwise a PathNotFound(location) error is produced
right away
+ /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+ /// optional (nullptr denotes blob not found)
+ Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease(
+ const AzureLocation& location, std::chrono::seconds lease_duration,
+ bool allow_missing = false, bool retry_allowed = true) {
+ DCHECK(!location.container.empty() && !location.path.empty());
+ auto path = std::string{internal::RemoveTrailingSlash(location.path)};
+ auto blob_client = GetBlobClient(location.container, std::move(path));
+ auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+ auto blob_url = blob_client.GetUrl();
+ auto lease_client =
std::make_unique<Blobs::BlobLeaseClient>(std::move(blob_client),
+
std::move(lease_id));
+ try {
+ [[maybe_unused]] auto result = lease_client->Acquire(lease_duration);
+ DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+ } catch (const Storage::StorageException& exception) {
+ if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
+ if (allow_missing) {
+ return nullptr;
+ }
+ return PathNotFound(location);
+ } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+ exception.ErrorCode == "LeaseAlreadyPresent") {
+ if (retry_allowed) {
+ LeaseGuard::WaitUntilLatestKnownExpiryTime();
+ return AcquireBlobLease(location, lease_duration, allow_missing,
+ /*retry_allowed=*/false);
+ }
+ }
+ return ExceptionToStatus(exception, "Failed to acquire a lease on file
'",
+ location.all, "': ", blob_url);
+ }
+ return lease_client;
+ }
+
+ /// \brief The default lease duration used for acquiring a lease on a
container or blob.
+ ///
+ /// Azure Storage leases can be acquired for a duration of 15 to 60 seconds.
+ ///
+ /// Operations consisting of an unpredictable number of sub-operations should
+ /// renew the lease periodically (heartbeat pattern) instead of acquiring an
+ /// infinite lease (very bad idea for a library like Arrow).
+ static constexpr auto kLeaseDuration = std::chrono::seconds{15};
+
+ // These are conservative estimates of how long it takes for the client
+ // request to reach the server counting from the moment the Azure SDK
function
+ // issuing the request is called. See their usage for more context.
+ //
+ // If the client connection to the server is unpredictably slow, operations
+ // may fail, but due to the use of leases, the entire arrow::FileSystem
+ // operation can be retried without risk of data loss. Thus, unexpected
network
+ // slow downs can be fixed with retries (either by some system using Arrow or
+ // an user interactively retrying a failed operation).
+ //
+ // If a network is permanently slow, the lease time and these numbers should
be
+ // increased but not so much so that the client gives up an operation
because the
+ // values say it takes more time to reach the server than the remaining lease
+ // time on the resources.
+ //
+ // NOTE: The initial constant values were chosen conservatively. If we learn,
+ // from experience, that they are causing issues, we can increase them. And
if
+ // broadly applicable values aren't possible, we can make them configurable.
+ static constexpr auto kTimeNeededForContainerDeletion =
std::chrono::seconds{3};
+ static constexpr auto kTimeNeededForContainerRename =
std::chrono::seconds{3};
+ static constexpr auto kTimeNeededForFileOrDirectoryRename =
std::chrono::seconds{3};
+
+ public:
+ /// The conditions for a successful container rename are derived from the
+ /// conditions for a successful `Move("/$src.container",
"/$dest.container")`.
+ /// The numbers here match the list in `Move`.
+ ///
+ /// 1. `src.container` must exist.
+ /// 2. If `src.container` and `dest.container` are the same, do nothing and
+ /// return OK.
+ /// 3. N/A.
+ /// 4. N/A.
+ /// 5. `dest.container` doesn't exist or is empty.
+ /// NOTE: one exception related to container Move is that when the
+ /// destination is empty we also require the source container to be empty,
+ /// because the only way to perform the "Move" is by deleting the source
+ /// instead of deleting the destination and renaming the source.
+ Status RenameContainer(const AzureLocation& src, const AzureLocation& dest) {
+ DCHECK(!src.container.empty() && src.path.empty());
+ DCHECK(!dest.container.empty() && dest.path.empty());
+ auto src_container_client = GetBlobContainerClient(src.container);
+
+ // If src and dest are the same, we only have to check src exists.
+ if (src.container == dest.container) {
+ ARROW_ASSIGN_OR_RAISE(auto info,
+ GetContainerPropsAsFileInfo(src,
src_container_client));
+ if (info.type() == FileType::NotFound) {
+ return PathNotFound(src);
+ }
+ DCHECK(info.type() == FileType::Directory);
+ return Status::OK();
+ }
+
+ // Acquire a lease on the source container because (1) we need the lease
+ // before rename and (2) it works as a way of checking the container
exists.
+ ARROW_ASSIGN_OR_RAISE(auto src_lease_client,
+ AcquireContainerLease(src, kLeaseDuration));
+ LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration};
+ // Check dest.container doesn't exist or is empty.
+ auto dest_container_client = GetBlobContainerClient(dest.container);
+ std::optional<LeaseGuard> dest_lease_guard;
+ bool dest_exists = false;
+ bool dest_is_empty = false;
+ ARROW_ASSIGN_OR_RAISE(
+ auto dest_lease_client,
+ AcquireContainerLease(dest, kLeaseDuration,
/*allow_missing_container*/ true));
+ if (dest_lease_client) {
+ dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration);
+ dest_exists = true;
+ // Emptiness check after successful acquisition of the lease.
+ Blobs::ListBlobsOptions list_blobs_options;
+ list_blobs_options.PageSizeHint = 1;
+ try {
+ auto dest_list_response =
dest_container_client.ListBlobs(list_blobs_options);
+ dest_is_empty = dest_list_response.Blobs.empty();
+ if (!dest_is_empty) {
+ return NotEmpty(dest);
+ }
+ } catch (const Storage::StorageException& exception) {
+ return ExceptionToStatus(exception, "Failed to check that '",
dest.container,
+ "' is empty: ",
dest_container_client.GetUrl());
+ }
+ }
+ DCHECK(!dest_exists || dest_is_empty);
+
+ if (!dest_exists) {
+ // Rename the source container.
+ Blobs::RenameBlobContainerOptions options;
+ options.SourceAccessConditions.LeaseId = src_lease_guard.LeaseId();
+ try {
+ src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerRename);
+ blob_service_client_->RenameBlobContainer(src.container,
dest.container, options);
+ src_lease_guard.Forget();
+ } catch (const Storage::StorageException& exception) {
+ if (exception.StatusCode == Http::HttpStatusCode::BadRequest &&
+ exception.ErrorCode == "InvalidQueryParameterValue") {
+ auto param_name =
exception.AdditionalInformation.find("QueryParameterName");
+ if (param_name != exception.AdditionalInformation.end() &&
+ param_name->second == "comp") {
+ return ExceptionToStatus(
+ exception, "The 'rename' operation is not supported on
containers. ",
+ "Attempting a rename of '", src.container, "' to '",
dest.container,
+ "': ", blob_service_client_->GetUrl());
+ }
+ }
+ return ExceptionToStatus(exception, "Failed to rename container '",
src.container,
+ "' to '", dest.container,
+ "': ", blob_service_client_->GetUrl());
+ }
+ } else if (dest_is_empty) {
+ // Even if we deleted the empty dest.container, RenameBlobContainer()
would still
+ // fail because containers are not immediately deleted by the service --
they are
+ // deleted asynchronously based on retention policies and
non-deterministic behavior
+ // of the garbage collection process.
+ //
+ // One way to still match POSIX rename semantics is to delete the
src.container if
+ // and only if it's empty because the final state would be equivalent to
replacing
+ // the dest.container with the src.container and its contents (which
happens
+ // to also be empty).
+ Blobs::ListBlobsOptions list_blobs_options;
+ list_blobs_options.PageSizeHint = 1;
+ try {
+ auto src_list_response =
src_container_client.ListBlobs(list_blobs_options);
+ if (!src_list_response.Blobs.empty()) {
+ return Status::IOError("Unable to replace empty container: '",
dest.all, "'");
+ }
+ // Delete the source container now that we know it's empty.
+ Blobs::DeleteBlobContainerOptions options;
+ options.AccessConditions.LeaseId = src_lease_guard.LeaseId();
+ DCHECK(dest_lease_guard.has_value());
+ // Make sure lease on dest is still valid before deleting src. This is
to ensure
+ // the destination container is not deleted by another process/client
before
+ // Move() returns.
+ if (!dest_lease_guard->StillValidFor(kTimeNeededForContainerDeletion))
{
+ return Status::IOError("Unable to replace empty container: '",
dest.all, "'. ",
+ "Preparation for the operation took too long
and a "
+ "container lease expired.");
+ }
+ try {
+ src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerDeletion);
+ src_container_client.DeleteIfExists(options);
Review Comment:
Yes, `Delete` is enough. I'm changing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]