kou commented on code in PR #39904: URL: https://github.com/apache/arrow/pull/39904#discussion_r1482369711
########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -942,6 +972,185 @@ FileInfo FileInfoFromBlob(std::string_view container, return info; } +/// \brief RAII-style guard for releasing a lease on a blob or container. +/// +/// The guard should be constructed right after a successful BlobLeaseClient::Acquire() +/// call. Use std::optional<LeaseGuard> to declare a guard in outer scope and construct it +/// later with std::optional::emplace(...). +/// +/// Leases expire automatically, but explicit release means concurrent clients or +/// ourselves when trying new operations on the same blob or container don't have +/// to wait for the lease to expire by itself. +/// +/// Learn more about leases at +/// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob +class LeaseGuard { + public: + using SteadyClock = std::chrono::steady_clock; + + private: + /// \brief The time when the lease expires or is broken. + /// + /// The lease is not guaranteed to be valid until this time, but it is guaranteed to + /// be expired after this time. In other words, this is an overestimation of + /// the true time_point. + SteadyClock::time_point break_or_expires_at_; + const std::unique_ptr<Blobs::BlobLeaseClient> lease_client_; + bool release_attempt_pending_ = true; + + /// \brief The latest known expiry time of a lease guarded by this class + /// that failed to be released or was forgotten by calling Forget(). + static std::atomic<SteadyClock::time_point> latest_known_expiry_time_; + + /// \brief The maximum lease duration supported by Azure Storage. + static constexpr std::chrono::seconds kMaxLeaseDuration{60}; + + public: + LeaseGuard(std::unique_ptr<Blobs::BlobLeaseClient> lease_client, + std::chrono::seconds lease_duration) + : break_or_expires_at_(SteadyClock::now() + + std::min(kMaxLeaseDuration, lease_duration)), + lease_client_(std::move(lease_client)) { + DCHECK(lease_duration <= kMaxLeaseDuration); + DCHECK(this->lease_client_); + } + + ARROW_DISALLOW_COPY_AND_ASSIGN(LeaseGuard); + + ~LeaseGuard() { + // No point in trying any error handling here other than the debug checking. The lease + // will eventually expire on the backend without any intervention from us (just much + // later than if we released it). + [[maybe_unused]] auto status = Release(); + ARROW_LOG(DEBUG) << status; + } + + bool PendingRelease() const { + return release_attempt_pending_ && SteadyClock::now() <= break_or_expires_at_; + } + + private: + Status DoRelease() { + DCHECK(release_attempt_pending_); + try { + lease_client_->Release(); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to release the ", + lease_client_->GetLeaseId(), " lease"); + } + return Status::OK(); + } + + public: + std::string LeaseId() const { return lease_client_->GetLeaseId(); } + + bool StillValidFor(SteadyClock::duration expected_time_left) const { + return SteadyClock::now() + expected_time_left < break_or_expires_at_; + } + + /// \brief Break the lease. + /// + /// The lease will stay in the "Breaking" state for break_period seconds or + /// less if the lease is expiring before that. + /// + /// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-container#outcomes-of-use-attempts-on-containers-by-lease-state + /// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob#outcomes-of-use-attempts-on-blobs-by-lease-state + Status Break(Azure::Nullable<std::chrono::seconds> break_period = {}) { + auto remaining_time_ms = [this]() { + const auto remaing_time = break_or_expires_at_ - SteadyClock::now(); + return std::chrono::duration_cast<std::chrono::milliseconds>(remaing_time); Review Comment: ```suggestion const auto remaining_time = break_or_expires_at_ - SteadyClock::now(); return std::chrono::duration_cast<std::chrono::milliseconds>(remaining_time); ``` ########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -1690,17 +1924,438 @@ class AzureFileSystem::Impl { } } + private: + /// \brief Create a BlobLeaseClient and acquire a lease on the container. + /// + /// \param allow_missing_container if true, a nullptr may be returned when the container + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes container not found) + Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing_container = false, bool retry_allowed = true) { + DCHECK(!location.container.empty()); + auto container_client = GetBlobContainerClient(location.container); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto container_url = container_client.GetUrl(); + auto lease_client = std::make_unique<Blobs::BlobLeaseClient>( + std::move(container_client), std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (IsContainerNotFound(exception)) { + if (allow_missing_container) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireContainerLease(location, lease_duration, allow_missing_container, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on container '", + location.container, "': ", container_url); + } + return lease_client; + } + + /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or + /// directory if Hierarchical Namespace is supported). + /// + /// \param allow_missing if true, a nullptr may be returned when the blob + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes blob not found) + Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing = false, bool retry_allowed = true) { + DCHECK(!location.container.empty() && !location.path.empty()); + auto path = std::string{internal::RemoveTrailingSlash(location.path)}; + auto blob_client = GetBlobClient(location.container, std::move(path)); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto blob_url = blob_client.GetUrl(); + auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(std::move(blob_client), + std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + if (allow_missing) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireBlobLease(location, lease_duration, allow_missing, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on file '", + location.all, "': ", blob_url); + } + return lease_client; + } + + /// \brief The default lease duration used for acquiring a lease on a container or blob. + /// + /// Azure Storage leases can be acquired for a duration of 15 to 60 seconds. + /// + /// Operations consisting of an unpredictable number of sub-operations should + /// renew the lease periodically (heartbeat pattern) instead of acquiring an + /// infinite lease (very bad idea for a library like Arrow). + static constexpr auto kLeaseDuration = std::chrono::seconds{15}; + + // These are conservative estimates of how long it takes for the client + // request to reach the server counting from the moment the Azure SDK function + // issuing the request is called. See their usage for more context. + // + // If the client connection to the server is unpredictably slow, operations + // may fail, but due to the use of leases, the entire arrow::FileSystem + // operation can be retried without risk of data loss. Thus, unexpected network + // slow downs can be fixed with retries (either by some system using Arrow or + // an user interactively retrying a failed operation). + // + // If a network is permanently slow, the lease time and these numbers should be + // increased but not so much so that the client gives up an operation because the + // values say it takes more time to reach the server than the remaining lease + // time on the resources. + // + // NOTE: The initial constant values were chosen conservatively. If we learn, + // from experience, that they are causing issues, we can increase them. And if + // broadly applicable values aren't possible, we can make them configurable. + static constexpr auto kTimeNeededForContainerDeletion = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForContainerRename = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForFileOrDirectoryRename = std::chrono::seconds{3}; + + public: + /// The conditions for a successful container rename are derived from the + /// conditions for a successful `Move("/$src.container", "/$dest.container")`. + /// The numbers here match the list in `Move`. + /// + /// 1. `src.container` must exist. + /// 2. If `src.container` and `dest.container` are the same, do nothing and + /// return OK. + /// 3. N/A. + /// 4. N/A. + /// 5. `dest.container` doesn't exist or is empty. + /// NOTE: one exception related to container Move is that when the + /// destination is empty we also require the source container to be empty, + /// because the only way to perform the "Move" is by deleting the source + /// instead of deleting the destination and renaming the source. + Status RenameContainer(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && src.path.empty()); + DCHECK(!dest.container.empty() && dest.path.empty()); + auto src_container_client = GetBlobContainerClient(src.container); + + // If src and dest are the same, we only have to check src exists. + if (src.container == dest.container) { + ARROW_ASSIGN_OR_RAISE(auto info, + GetContainerPropsAsFileInfo(src, src_container_client)); + if (info.type() == FileType::NotFound) { + return PathNotFound(src); + } + DCHECK(info.type() == FileType::Directory); + return Status::OK(); + } + + // Acquire a lease on the source container because (1) we need the lease + // before rename and (2) it works as a way of checking the container exists. + ARROW_ASSIGN_OR_RAISE(auto src_lease_client, + AcquireContainerLease(src, kLeaseDuration)); + LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration}; Review Comment: We must specify the same lease duration to `AcquireContainer()` and `LeaseGuard::LeaseGuard()`, right? It may be misused. Can we return `std::unique_ptr<LeaseGuard>` by `AcquireContainerLease()` to avoid creating a `LeaseGuard` manually? ########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -1690,17 +1924,438 @@ class AzureFileSystem::Impl { } } + private: + /// \brief Create a BlobLeaseClient and acquire a lease on the container. + /// + /// \param allow_missing_container if true, a nullptr may be returned when the container + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes container not found) + Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing_container = false, bool retry_allowed = true) { + DCHECK(!location.container.empty()); + auto container_client = GetBlobContainerClient(location.container); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto container_url = container_client.GetUrl(); + auto lease_client = std::make_unique<Blobs::BlobLeaseClient>( + std::move(container_client), std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (IsContainerNotFound(exception)) { + if (allow_missing_container) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireContainerLease(location, lease_duration, allow_missing_container, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on container '", + location.container, "': ", container_url); + } + return lease_client; + } + + /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or + /// directory if Hierarchical Namespace is supported). + /// + /// \param allow_missing if true, a nullptr may be returned when the blob + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes blob not found) + Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing = false, bool retry_allowed = true) { + DCHECK(!location.container.empty() && !location.path.empty()); + auto path = std::string{internal::RemoveTrailingSlash(location.path)}; + auto blob_client = GetBlobClient(location.container, std::move(path)); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto blob_url = blob_client.GetUrl(); + auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(std::move(blob_client), + std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + if (allow_missing) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireBlobLease(location, lease_duration, allow_missing, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on file '", + location.all, "': ", blob_url); + } + return lease_client; + } + + /// \brief The default lease duration used for acquiring a lease on a container or blob. + /// + /// Azure Storage leases can be acquired for a duration of 15 to 60 seconds. + /// + /// Operations consisting of an unpredictable number of sub-operations should + /// renew the lease periodically (heartbeat pattern) instead of acquiring an + /// infinite lease (very bad idea for a library like Arrow). + static constexpr auto kLeaseDuration = std::chrono::seconds{15}; + + // These are conservative estimates of how long it takes for the client + // request to reach the server counting from the moment the Azure SDK function + // issuing the request is called. See their usage for more context. + // + // If the client connection to the server is unpredictably slow, operations + // may fail, but due to the use of leases, the entire arrow::FileSystem + // operation can be retried without risk of data loss. Thus, unexpected network + // slow downs can be fixed with retries (either by some system using Arrow or + // an user interactively retrying a failed operation). + // + // If a network is permanently slow, the lease time and these numbers should be + // increased but not so much so that the client gives up an operation because the + // values say it takes more time to reach the server than the remaining lease + // time on the resources. + // + // NOTE: The initial constant values were chosen conservatively. If we learn, + // from experience, that they are causing issues, we can increase them. And if + // broadly applicable values aren't possible, we can make them configurable. + static constexpr auto kTimeNeededForContainerDeletion = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForContainerRename = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForFileOrDirectoryRename = std::chrono::seconds{3}; + + public: + /// The conditions for a successful container rename are derived from the + /// conditions for a successful `Move("/$src.container", "/$dest.container")`. + /// The numbers here match the list in `Move`. + /// + /// 1. `src.container` must exist. + /// 2. If `src.container` and `dest.container` are the same, do nothing and + /// return OK. + /// 3. N/A. + /// 4. N/A. + /// 5. `dest.container` doesn't exist or is empty. + /// NOTE: one exception related to container Move is that when the + /// destination is empty we also require the source container to be empty, + /// because the only way to perform the "Move" is by deleting the source + /// instead of deleting the destination and renaming the source. + Status RenameContainer(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && src.path.empty()); + DCHECK(!dest.container.empty() && dest.path.empty()); + auto src_container_client = GetBlobContainerClient(src.container); + + // If src and dest are the same, we only have to check src exists. + if (src.container == dest.container) { + ARROW_ASSIGN_OR_RAISE(auto info, + GetContainerPropsAsFileInfo(src, src_container_client)); + if (info.type() == FileType::NotFound) { + return PathNotFound(src); + } + DCHECK(info.type() == FileType::Directory); + return Status::OK(); + } + + // Acquire a lease on the source container because (1) we need the lease + // before rename and (2) it works as a way of checking the container exists. + ARROW_ASSIGN_OR_RAISE(auto src_lease_client, + AcquireContainerLease(src, kLeaseDuration)); + LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration}; + // Check dest.container doesn't exist or is empty. + auto dest_container_client = GetBlobContainerClient(dest.container); + std::optional<LeaseGuard> dest_lease_guard; + bool dest_exists = false; + bool dest_is_empty = false; + ARROW_ASSIGN_OR_RAISE( + auto dest_lease_client, + AcquireContainerLease(dest, kLeaseDuration, /*allow_missing_container*/ true)); + if (dest_lease_client) { + dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration); + dest_exists = true; + // Emptiness check after successful acquisition of the lease. + Blobs::ListBlobsOptions list_blobs_options; + list_blobs_options.PageSizeHint = 1; + try { + auto dest_list_response = dest_container_client.ListBlobs(list_blobs_options); + dest_is_empty = dest_list_response.Blobs.empty(); + if (!dest_is_empty) { + return NotEmpty(dest); + } + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to check that '", dest.container, + "' is empty: ", dest_container_client.GetUrl()); + } + } + DCHECK(!dest_exists || dest_is_empty); + + if (!dest_exists) { + // Rename the source container. + Blobs::RenameBlobContainerOptions options; + options.SourceAccessConditions.LeaseId = src_lease_guard.LeaseId(); + try { + src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerRename); + blob_service_client_->RenameBlobContainer(src.container, dest.container, options); + src_lease_guard.Forget(); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::BadRequest && + exception.ErrorCode == "InvalidQueryParameterValue") { + auto param_name = exception.AdditionalInformation.find("QueryParameterName"); + if (param_name != exception.AdditionalInformation.end() && + param_name->second == "comp") { + return ExceptionToStatus( + exception, "The 'rename' operation is not supported on containers. ", + "Attempting a rename of '", src.container, "' to '", dest.container, + "': ", blob_service_client_->GetUrl()); + } + } + return ExceptionToStatus(exception, "Failed to rename container '", src.container, + "' to '", dest.container, + "': ", blob_service_client_->GetUrl()); + } + } else if (dest_is_empty) { + // Even if we deleted the empty dest.container, RenameBlobContainer() would still + // fail because containers are not immediately deleted by the service -- they are + // deleted asynchronously based on retention policies and non-deterministic behavior + // of the garbage collection process. + // + // One way to still match POSIX rename semantics is to delete the src.container if + // and only if it's empty because the final state would be equivalent to replacing + // the dest.container with the src.container and its contents (which happens + // to also be empty). + Blobs::ListBlobsOptions list_blobs_options; + list_blobs_options.PageSizeHint = 1; + try { + auto src_list_response = src_container_client.ListBlobs(list_blobs_options); + if (!src_list_response.Blobs.empty()) { + return Status::IOError("Unable to replace empty container: '", dest.all, "'"); + } + // Delete the source container now that we know it's empty. + Blobs::DeleteBlobContainerOptions options; + options.AccessConditions.LeaseId = src_lease_guard.LeaseId(); + DCHECK(dest_lease_guard.has_value()); + // Make sure lease on dest is still valid before deleting src. This is to ensure + // the destination container is not deleted by another process/client before + // Move() returns. + if (!dest_lease_guard->StillValidFor(kTimeNeededForContainerDeletion)) { + return Status::IOError("Unable to replace empty container: '", dest.all, "'. ", + "Preparation for the operation took too long and a " + "container lease expired."); + } + try { + src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerDeletion); + src_container_client.DeleteIfExists(options); Review Comment: Why do we need to use `DeleteIfExists()` here? Can we use `Delete()` here? ########## cpp/src/arrow/filesystem/azurefs_test.cc: ########## @@ -854,6 +888,395 @@ class TestAzureFileSystem : public ::testing::Test { const auto directory_path = data.RandomDirectoryPath(rng_); ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false)); } + + private: + using StringMatcher = + ::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>; + + StringMatcher HasDirMoveToSubdirMessage(const std::string& src, + const std::string& dest) { + return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + src + + "' a sub-directory of itself."); + } + + StringMatcher HasCrossContainerNotImplementedMessage(const std::string& container_name, + const std::string& dest) { + return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest + + "' requires moving data between " + "containers, which is not implemented."); + } + + StringMatcher HasMissingParentDirMessage(const std::string& dest) { + return ::testing::HasSubstr("The parent directory of the destination path '" + dest + + "' does not exist."); + } + + /// \brief Expected POSIX semantics for the rename operation on multiple + /// scenarios. + /// + /// If the src doesn't exist, the error is always ENOENT, otherwise we are + /// left with the following combinations: + /// + /// 1. src's type + /// a. File + /// b. Directory + /// 2. dest's existence + /// a. NotFound + /// b. File + /// c. Directory + /// - empty + /// - non-empty + /// 3. src path has a trailing slash (or not) + /// 4. dest path has a trailing slash (or not) + /// + /// Limitations: this function doesn't consider paths so it assumes that the + /// paths don't lead requests for moves that would make the source a subdir of + /// the destination. + /// + /// \param paths_are_equal src and dest paths without trailing slashes are equal + /// \return std::nullopt if success is expected in the scenario or the errno + /// if failure is expected. + static std::optional<int> RenameSemantics(FileType src_type, bool src_trailing_slash, + FileType dest_type, bool dest_trailing_slash, + bool dest_is_empty_dir = false, + bool paths_are_equal = false) { + DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown); + DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory) + << "dest_is_empty_dir must imply dest_type == FileType::Directory"; + switch (src_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + return {ENOENT}; + case FileType::File: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + if (src_trailing_slash) { + return {ENOTDIR}; + } + if (dest_trailing_slash) { + // A slash on the destination path requires that it exists, + // so a confirmation that it's a directory can be performed. + return {ENOENT}; + } + return {}; + case FileType::File: + if (src_trailing_slash || dest_trailing_slash) { + return {ENOTDIR}; + } + // The existing file is replaced successfuly. + return {}; + case FileType::Directory: + if (src_trailing_slash) { + return {ENOTDIR}; + } + return EISDIR; + } + break; + case FileType::Directory: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + // We don't have to care about the slashes when the source is a directory. + return {}; + case FileType::File: + return {ENOTDIR}; + case FileType::Directory: + if (!paths_are_equal && !dest_is_empty_dir) { + return {ENOTEMPTY}; + } + return {}; + } + break; + } + Unreachable("Invalid parameters passed to RenameSemantics"); + } + + Status CheckExpectedErrno(const std::string& src, const std::string& dest, + std::optional<int> expected_errno, + const char* expected_errno_name, FileInfo* out_src_info) { + auto the_fs = fs(); + const bool src_trailing_slash = internal::HasTrailingSlash(src); + const bool dest_trailing_slash = internal::HasTrailingSlash(dest); + const auto src_path = std::string{internal::RemoveTrailingSlash(src)}; + const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)}; + ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path)); + bool dest_is_empty_dir = false; + if (dest_info.type() == FileType::Directory) { + FileSelector select; + select.base_dir = dest_path; + select.recursive = false; + // TODO(felipecrv): investigate why this can't be false + select.allow_not_found = true; + ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select)); + if (dest_contents.empty()) { + dest_is_empty_dir = true; + } + } + auto paths_are_equal = src_path == dest_path; + auto truly_expected_errno = + RenameSemantics(out_src_info->type(), src_trailing_slash, dest_info.type(), + dest_trailing_slash, dest_is_empty_dir, paths_are_equal); + if (truly_expected_errno != expected_errno) { + if (expected_errno.has_value()) { + return Status::Invalid("expected_errno=", expected_errno_name, "=", + *expected_errno, + " used in ASSERT_MOVE is incorrect. " + "POSIX semantics for this scenario require errno=", + strerror(truly_expected_errno.value_or(0))); + } else { + DCHECK(truly_expected_errno.has_value()); + return Status::Invalid( + "ASSERT_MOVE used to assert success in a scenario for which " + "POSIX semantics requires errno=", + strerror(*truly_expected_errno)); + } + } + return Status::OK(); + } + + void AssertAfterMove(const std::string& src, const std::string& dest, FileType type) { + if (internal::RemoveTrailingSlash(src) != internal::RemoveTrailingSlash(dest)) { + AssertFileInfo(fs(), src, FileType::NotFound); + } + AssertFileInfo(fs(), dest, type); + } + + static bool WithErrno(const Status& status, int expected_errno) { + auto* detail = status.detail().get(); + return detail && + arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == expected_errno; + } + + std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& src_info, + const std::string& src, + const std::string& dest, + int for_errno) { + switch (for_errno) { + case ENOENT: { + auto& path = src_info.type() == FileType::NotFound ? src : dest; + return ::testing::HasSubstr("Path does not exist '" + path + "'"); + } + case ENOTEMPTY: + return ::testing::HasSubstr("Directory not empty: '" + dest + "'"); + } + return std::nullopt; + } + +#define ASSERT_MOVE(src, dest, expected_errno) \ + do { \ + auto _src = (src); \ + auto _dest = (dest); \ + std::optional<int> _expected_errno = (expected_errno); \ + FileInfo _src_info; \ + ASSERT_OK( \ + CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, &_src_info)); \ + auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, _dest)); \ + if (_expected_errno.has_value()) { \ + if (WithErrno(_move_st, *_expected_errno)) { \ + /* If the Move failed, the source should remain unchanged. */ \ + AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, \ + _src_info.type()); \ + auto _message_matcher = \ + MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); \ + if (_message_matcher.has_value()) { \ + EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, _move_st); \ + } else { \ + SUCCEED(); \ + } \ + } else { \ + FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) \ + << "' did not fail with errno=" << #expected_errno; \ + } \ + } else { \ + if (!_move_st.ok()) { \ + FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) \ + << "' failed with " << _move_st.ToString(); \ + } else { \ + AssertAfterMove(_src, _dest, _src_info.type()); \ + } \ + } \ + } while (false) + +#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt) + + // Tests for Move() + + public: + void TestRenameContainer() { + EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv()); + auto data = SetUpPreexistingData(); + // Container exists, so renaming to the same name succeeds because it's a no-op. + ASSERT_MOVE_OK(data.container_name, data.container_name); + // Renaming a container that doesn't exist fails. + ASSERT_MOVE("missing-container", "missing-container", ENOENT); + ASSERT_MOVE("missing-container", data.container_name, ENOENT); + // Renaming a container to an existing non-empty container fails. + auto non_empty_container = PreexistingData::RandomContainerName(rng_); + auto non_empty_container_client = CreateContainer(non_empty_container); + CreateBlob(non_empty_container_client, "object1", PreexistingData::kLoremIpsum); + ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY); + // Renaming to an empty container fails to replace it + auto empty_container = PreexistingData::RandomContainerName(rng_); + auto empty_container_client = CreateContainer(empty_container); + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr("Unable to replace empty container: '" + empty_container + + "'"), + fs()->Move(data.container_name, empty_container)); + // Renaming to a non-existing container creates it + auto missing_container = PreexistingData::RandomContainerName(rng_); + AssertFileInfo(fs(), missing_container, FileType::NotFound); + if (env->backend() == AzureBackend::kAzurite) { + // Azurite returns a 201 Created for RenameBlobContainer, but the created + // container doesn't contain the blobs from the source container and + // the source container remains undeleted after the "rename". + } else { + // See Azure SDK issue/question: + // https://github.com/Azure/azure-sdk-for-cpp/issues/5262 + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr("The 'rename' operation is not supported on containers."), + fs()->Move(data.container_name, missing_container)); + // ASSERT_MOVE_OK(data.container_name, missing_container); + // AssertFileInfo(fs(), + // ConcatAbstractPath(missing_container, + // PreexistingData::kObjectName), FileType::File); + } + // Renaming to an empty container can work if the source is also empty + auto new_empty_container = PreexistingData::RandomContainerName(rng_); + auto new_empty_container_client = CreateContainer(new_empty_container); + ASSERT_MOVE_OK(empty_container, new_empty_container); + } + + void TestMoveContainerToPath() { + auto data = SetUpPreexistingData(); + ASSERT_MOVE("missing-container", data.ContainerPath("new-subdir"), ENOENT); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + HasDirMoveToSubdirMessage(data.container_name, data.ContainerPath("new-subdir")), + fs()->Move(data.container_name, data.ContainerPath("new-subdir"))); + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(data.container_name, + "a-container/new-subdir"), + fs()->Move(data.container_name, "a-container/new-subdir")); Review Comment: ```suggestion "missing-container/new-subdir"), fs()->Move(data.container_name, "missing-container/new-subdir")); ``` ########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -1115,6 +1332,22 @@ class AzureFileSystem::Impl { } } + Result<FileInfo> GetFileInfoOfPathWithinContainer(const AzureLocation& location) { Review Comment: It seems that most codes are same as AzureFileSystem::GetFileInfo()`. How about renaming this to `GetFileInfo(const AzureLocation& location)` and move `AzureFileSystem::GetFileInfo()` implementation to here? ########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -1879,7 +2525,24 @@ Status AzureFileSystem::DeleteFile(const std::string& path) { } Status AzureFileSystem::Move(const std::string& src, const std::string& dest) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto src_location, AzureLocation::FromString(src)); + ARROW_ASSIGN_OR_RAISE(auto dest_location, AzureLocation::FromString(dest)); + if (src_location.container.empty()) { + return Status::Invalid("Move requires a non-empty source path."); + } + if (dest_location.container.empty()) { + return Status::Invalid("Move requires a non-empty destination path."); + } + if (src_location.path.empty()) { + if (dest_location.path.empty()) { + return impl_->RenameContainer(src_location, dest_location); + } + return impl_->MoveContainerToPath(src_location, dest_location); + } + if (dest_location.path.empty()) { + return impl_->CreateContainerFromPath(src_location, dest_location); + } + return impl_->MovePaths(src_location, dest_location); Review Comment: `MovePath()`? ########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -1690,17 +1924,438 @@ class AzureFileSystem::Impl { } } + private: + /// \brief Create a BlobLeaseClient and acquire a lease on the container. + /// + /// \param allow_missing_container if true, a nullptr may be returned when the container + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes container not found) + Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing_container = false, bool retry_allowed = true) { + DCHECK(!location.container.empty()); + auto container_client = GetBlobContainerClient(location.container); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto container_url = container_client.GetUrl(); + auto lease_client = std::make_unique<Blobs::BlobLeaseClient>( + std::move(container_client), std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (IsContainerNotFound(exception)) { + if (allow_missing_container) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireContainerLease(location, lease_duration, allow_missing_container, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on container '", + location.container, "': ", container_url); + } + return lease_client; + } + + /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or + /// directory if Hierarchical Namespace is supported). + /// + /// \param allow_missing if true, a nullptr may be returned when the blob + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes blob not found) + Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing = false, bool retry_allowed = true) { + DCHECK(!location.container.empty() && !location.path.empty()); + auto path = std::string{internal::RemoveTrailingSlash(location.path)}; + auto blob_client = GetBlobClient(location.container, std::move(path)); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto blob_url = blob_client.GetUrl(); + auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(std::move(blob_client), + std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + if (allow_missing) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireBlobLease(location, lease_duration, allow_missing, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on file '", + location.all, "': ", blob_url); + } + return lease_client; + } + + /// \brief The default lease duration used for acquiring a lease on a container or blob. + /// + /// Azure Storage leases can be acquired for a duration of 15 to 60 seconds. + /// + /// Operations consisting of an unpredictable number of sub-operations should + /// renew the lease periodically (heartbeat pattern) instead of acquiring an + /// infinite lease (very bad idea for a library like Arrow). + static constexpr auto kLeaseDuration = std::chrono::seconds{15}; + + // These are conservative estimates of how long it takes for the client + // request to reach the server counting from the moment the Azure SDK function + // issuing the request is called. See their usage for more context. + // + // If the client connection to the server is unpredictably slow, operations + // may fail, but due to the use of leases, the entire arrow::FileSystem + // operation can be retried without risk of data loss. Thus, unexpected network + // slow downs can be fixed with retries (either by some system using Arrow or + // an user interactively retrying a failed operation). + // + // If a network is permanently slow, the lease time and these numbers should be + // increased but not so much so that the client gives up an operation because the + // values say it takes more time to reach the server than the remaining lease + // time on the resources. + // + // NOTE: The initial constant values were chosen conservatively. If we learn, + // from experience, that they are causing issues, we can increase them. And if + // broadly applicable values aren't possible, we can make them configurable. + static constexpr auto kTimeNeededForContainerDeletion = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForContainerRename = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForFileOrDirectoryRename = std::chrono::seconds{3}; + + public: + /// The conditions for a successful container rename are derived from the + /// conditions for a successful `Move("/$src.container", "/$dest.container")`. + /// The numbers here match the list in `Move`. + /// + /// 1. `src.container` must exist. + /// 2. If `src.container` and `dest.container` are the same, do nothing and + /// return OK. + /// 3. N/A. + /// 4. N/A. + /// 5. `dest.container` doesn't exist or is empty. + /// NOTE: one exception related to container Move is that when the + /// destination is empty we also require the source container to be empty, + /// because the only way to perform the "Move" is by deleting the source + /// instead of deleting the destination and renaming the source. + Status RenameContainer(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && src.path.empty()); + DCHECK(!dest.container.empty() && dest.path.empty()); + auto src_container_client = GetBlobContainerClient(src.container); + + // If src and dest are the same, we only have to check src exists. + if (src.container == dest.container) { + ARROW_ASSIGN_OR_RAISE(auto info, + GetContainerPropsAsFileInfo(src, src_container_client)); + if (info.type() == FileType::NotFound) { + return PathNotFound(src); + } + DCHECK(info.type() == FileType::Directory); + return Status::OK(); + } + + // Acquire a lease on the source container because (1) we need the lease + // before rename and (2) it works as a way of checking the container exists. + ARROW_ASSIGN_OR_RAISE(auto src_lease_client, + AcquireContainerLease(src, kLeaseDuration)); + LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration}; + // Check dest.container doesn't exist or is empty. + auto dest_container_client = GetBlobContainerClient(dest.container); + std::optional<LeaseGuard> dest_lease_guard; + bool dest_exists = false; + bool dest_is_empty = false; + ARROW_ASSIGN_OR_RAISE( + auto dest_lease_client, + AcquireContainerLease(dest, kLeaseDuration, /*allow_missing_container*/ true)); + if (dest_lease_client) { + dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration); + dest_exists = true; + // Emptiness check after successful acquisition of the lease. + Blobs::ListBlobsOptions list_blobs_options; + list_blobs_options.PageSizeHint = 1; + try { + auto dest_list_response = dest_container_client.ListBlobs(list_blobs_options); + dest_is_empty = dest_list_response.Blobs.empty(); + if (!dest_is_empty) { + return NotEmpty(dest); + } + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to check that '", dest.container, + "' is empty: ", dest_container_client.GetUrl()); + } + } + DCHECK(!dest_exists || dest_is_empty); + + if (!dest_exists) { + // Rename the source container. + Blobs::RenameBlobContainerOptions options; + options.SourceAccessConditions.LeaseId = src_lease_guard.LeaseId(); + try { + src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerRename); + blob_service_client_->RenameBlobContainer(src.container, dest.container, options); + src_lease_guard.Forget(); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::BadRequest && + exception.ErrorCode == "InvalidQueryParameterValue") { + auto param_name = exception.AdditionalInformation.find("QueryParameterName"); + if (param_name != exception.AdditionalInformation.end() && + param_name->second == "comp") { + return ExceptionToStatus( + exception, "The 'rename' operation is not supported on containers. ", + "Attempting a rename of '", src.container, "' to '", dest.container, + "': ", blob_service_client_->GetUrl()); + } + } + return ExceptionToStatus(exception, "Failed to rename container '", src.container, + "' to '", dest.container, + "': ", blob_service_client_->GetUrl()); + } + } else if (dest_is_empty) { + // Even if we deleted the empty dest.container, RenameBlobContainer() would still + // fail because containers are not immediately deleted by the service -- they are + // deleted asynchronously based on retention policies and non-deterministic behavior + // of the garbage collection process. + // + // One way to still match POSIX rename semantics is to delete the src.container if + // and only if it's empty because the final state would be equivalent to replacing + // the dest.container with the src.container and its contents (which happens + // to also be empty). + Blobs::ListBlobsOptions list_blobs_options; + list_blobs_options.PageSizeHint = 1; + try { + auto src_list_response = src_container_client.ListBlobs(list_blobs_options); + if (!src_list_response.Blobs.empty()) { + return Status::IOError("Unable to replace empty container: '", dest.all, "'"); Review Comment: ```suggestion return Status::IOError("Unable to replace by non empty container: '", src.all, "'"); ``` ########## cpp/src/arrow/filesystem/azurefs_test.cc: ########## @@ -854,6 +888,395 @@ class TestAzureFileSystem : public ::testing::Test { const auto directory_path = data.RandomDirectoryPath(rng_); ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false)); } + + private: + using StringMatcher = + ::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>; + + StringMatcher HasDirMoveToSubdirMessage(const std::string& src, + const std::string& dest) { + return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + src + + "' a sub-directory of itself."); + } + + StringMatcher HasCrossContainerNotImplementedMessage(const std::string& container_name, + const std::string& dest) { + return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest + + "' requires moving data between " + "containers, which is not implemented."); + } + + StringMatcher HasMissingParentDirMessage(const std::string& dest) { + return ::testing::HasSubstr("The parent directory of the destination path '" + dest + + "' does not exist."); + } + + /// \brief Expected POSIX semantics for the rename operation on multiple + /// scenarios. + /// + /// If the src doesn't exist, the error is always ENOENT, otherwise we are + /// left with the following combinations: + /// + /// 1. src's type + /// a. File + /// b. Directory + /// 2. dest's existence + /// a. NotFound + /// b. File + /// c. Directory + /// - empty + /// - non-empty + /// 3. src path has a trailing slash (or not) + /// 4. dest path has a trailing slash (or not) + /// + /// Limitations: this function doesn't consider paths so it assumes that the + /// paths don't lead requests for moves that would make the source a subdir of + /// the destination. + /// + /// \param paths_are_equal src and dest paths without trailing slashes are equal + /// \return std::nullopt if success is expected in the scenario or the errno + /// if failure is expected. + static std::optional<int> RenameSemantics(FileType src_type, bool src_trailing_slash, + FileType dest_type, bool dest_trailing_slash, + bool dest_is_empty_dir = false, + bool paths_are_equal = false) { + DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown); + DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory) + << "dest_is_empty_dir must imply dest_type == FileType::Directory"; + switch (src_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + return {ENOENT}; + case FileType::File: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + if (src_trailing_slash) { + return {ENOTDIR}; + } + if (dest_trailing_slash) { + // A slash on the destination path requires that it exists, + // so a confirmation that it's a directory can be performed. + return {ENOENT}; + } + return {}; + case FileType::File: + if (src_trailing_slash || dest_trailing_slash) { + return {ENOTDIR}; + } + // The existing file is replaced successfuly. + return {}; + case FileType::Directory: + if (src_trailing_slash) { + return {ENOTDIR}; + } + return EISDIR; + } + break; + case FileType::Directory: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + // We don't have to care about the slashes when the source is a directory. + return {}; + case FileType::File: + return {ENOTDIR}; + case FileType::Directory: + if (!paths_are_equal && !dest_is_empty_dir) { + return {ENOTEMPTY}; + } + return {}; + } + break; + } + Unreachable("Invalid parameters passed to RenameSemantics"); + } + + Status CheckExpectedErrno(const std::string& src, const std::string& dest, + std::optional<int> expected_errno, + const char* expected_errno_name, FileInfo* out_src_info) { + auto the_fs = fs(); + const bool src_trailing_slash = internal::HasTrailingSlash(src); + const bool dest_trailing_slash = internal::HasTrailingSlash(dest); + const auto src_path = std::string{internal::RemoveTrailingSlash(src)}; + const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)}; + ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path)); + bool dest_is_empty_dir = false; + if (dest_info.type() == FileType::Directory) { + FileSelector select; + select.base_dir = dest_path; + select.recursive = false; + // TODO(felipecrv): investigate why this can't be false + select.allow_not_found = true; Review Comment: Do you want to solve this in this PR? If you want to defer this to a follow-up task, could you create an issue for it? ########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -1690,17 +1924,438 @@ class AzureFileSystem::Impl { } } + private: + /// \brief Create a BlobLeaseClient and acquire a lease on the container. + /// + /// \param allow_missing_container if true, a nullptr may be returned when the container + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes container not found) + Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing_container = false, bool retry_allowed = true) { + DCHECK(!location.container.empty()); + auto container_client = GetBlobContainerClient(location.container); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto container_url = container_client.GetUrl(); + auto lease_client = std::make_unique<Blobs::BlobLeaseClient>( + std::move(container_client), std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (IsContainerNotFound(exception)) { + if (allow_missing_container) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireContainerLease(location, lease_duration, allow_missing_container, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on container '", + location.container, "': ", container_url); + } + return lease_client; + } + + /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or + /// directory if Hierarchical Namespace is supported). + /// + /// \param allow_missing if true, a nullptr may be returned when the blob + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes blob not found) + Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing = false, bool retry_allowed = true) { + DCHECK(!location.container.empty() && !location.path.empty()); + auto path = std::string{internal::RemoveTrailingSlash(location.path)}; + auto blob_client = GetBlobClient(location.container, std::move(path)); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto blob_url = blob_client.GetUrl(); + auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(std::move(blob_client), + std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + if (allow_missing) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireBlobLease(location, lease_duration, allow_missing, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on file '", + location.all, "': ", blob_url); + } + return lease_client; + } + + /// \brief The default lease duration used for acquiring a lease on a container or blob. + /// + /// Azure Storage leases can be acquired for a duration of 15 to 60 seconds. + /// + /// Operations consisting of an unpredictable number of sub-operations should + /// renew the lease periodically (heartbeat pattern) instead of acquiring an + /// infinite lease (very bad idea for a library like Arrow). + static constexpr auto kLeaseDuration = std::chrono::seconds{15}; + + // These are conservative estimates of how long it takes for the client + // request to reach the server counting from the moment the Azure SDK function + // issuing the request is called. See their usage for more context. + // + // If the client connection to the server is unpredictably slow, operations + // may fail, but due to the use of leases, the entire arrow::FileSystem + // operation can be retried without risk of data loss. Thus, unexpected network + // slow downs can be fixed with retries (either by some system using Arrow or + // an user interactively retrying a failed operation). + // + // If a network is permanently slow, the lease time and these numbers should be + // increased but not so much so that the client gives up an operation because the + // values say it takes more time to reach the server than the remaining lease + // time on the resources. + // + // NOTE: The initial constant values were chosen conservatively. If we learn, + // from experience, that they are causing issues, we can increase them. And if + // broadly applicable values aren't possible, we can make them configurable. + static constexpr auto kTimeNeededForContainerDeletion = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForContainerRename = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForFileOrDirectoryRename = std::chrono::seconds{3}; + + public: + /// The conditions for a successful container rename are derived from the + /// conditions for a successful `Move("/$src.container", "/$dest.container")`. + /// The numbers here match the list in `Move`. + /// + /// 1. `src.container` must exist. + /// 2. If `src.container` and `dest.container` are the same, do nothing and + /// return OK. + /// 3. N/A. + /// 4. N/A. + /// 5. `dest.container` doesn't exist or is empty. + /// NOTE: one exception related to container Move is that when the + /// destination is empty we also require the source container to be empty, + /// because the only way to perform the "Move" is by deleting the source + /// instead of deleting the destination and renaming the source. + Status RenameContainer(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && src.path.empty()); + DCHECK(!dest.container.empty() && dest.path.empty()); + auto src_container_client = GetBlobContainerClient(src.container); + + // If src and dest are the same, we only have to check src exists. + if (src.container == dest.container) { + ARROW_ASSIGN_OR_RAISE(auto info, + GetContainerPropsAsFileInfo(src, src_container_client)); + if (info.type() == FileType::NotFound) { + return PathNotFound(src); + } + DCHECK(info.type() == FileType::Directory); + return Status::OK(); + } + + // Acquire a lease on the source container because (1) we need the lease + // before rename and (2) it works as a way of checking the container exists. + ARROW_ASSIGN_OR_RAISE(auto src_lease_client, + AcquireContainerLease(src, kLeaseDuration)); + LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration}; + // Check dest.container doesn't exist or is empty. + auto dest_container_client = GetBlobContainerClient(dest.container); + std::optional<LeaseGuard> dest_lease_guard; + bool dest_exists = false; + bool dest_is_empty = false; + ARROW_ASSIGN_OR_RAISE( + auto dest_lease_client, + AcquireContainerLease(dest, kLeaseDuration, /*allow_missing_container*/ true)); + if (dest_lease_client) { + dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration); + dest_exists = true; + // Emptiness check after successful acquisition of the lease. + Blobs::ListBlobsOptions list_blobs_options; + list_blobs_options.PageSizeHint = 1; + try { + auto dest_list_response = dest_container_client.ListBlobs(list_blobs_options); + dest_is_empty = dest_list_response.Blobs.empty(); + if (!dest_is_empty) { + return NotEmpty(dest); + } + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to check that '", dest.container, + "' is empty: ", dest_container_client.GetUrl()); + } + } + DCHECK(!dest_exists || dest_is_empty); + + if (!dest_exists) { + // Rename the source container. + Blobs::RenameBlobContainerOptions options; + options.SourceAccessConditions.LeaseId = src_lease_guard.LeaseId(); + try { + src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerRename); + blob_service_client_->RenameBlobContainer(src.container, dest.container, options); + src_lease_guard.Forget(); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::BadRequest && + exception.ErrorCode == "InvalidQueryParameterValue") { + auto param_name = exception.AdditionalInformation.find("QueryParameterName"); + if (param_name != exception.AdditionalInformation.end() && + param_name->second == "comp") { + return ExceptionToStatus( + exception, "The 'rename' operation is not supported on containers. ", + "Attempting a rename of '", src.container, "' to '", dest.container, + "': ", blob_service_client_->GetUrl()); + } + } + return ExceptionToStatus(exception, "Failed to rename container '", src.container, + "' to '", dest.container, + "': ", blob_service_client_->GetUrl()); + } + } else if (dest_is_empty) { + // Even if we deleted the empty dest.container, RenameBlobContainer() would still + // fail because containers are not immediately deleted by the service -- they are + // deleted asynchronously based on retention policies and non-deterministic behavior + // of the garbage collection process. + // + // One way to still match POSIX rename semantics is to delete the src.container if + // and only if it's empty because the final state would be equivalent to replacing + // the dest.container with the src.container and its contents (which happens + // to also be empty). + Blobs::ListBlobsOptions list_blobs_options; + list_blobs_options.PageSizeHint = 1; + try { + auto src_list_response = src_container_client.ListBlobs(list_blobs_options); + if (!src_list_response.Blobs.empty()) { + return Status::IOError("Unable to replace empty container: '", dest.all, "'"); + } + // Delete the source container now that we know it's empty. + Blobs::DeleteBlobContainerOptions options; + options.AccessConditions.LeaseId = src_lease_guard.LeaseId(); + DCHECK(dest_lease_guard.has_value()); + // Make sure lease on dest is still valid before deleting src. This is to ensure + // the destination container is not deleted by another process/client before + // Move() returns. + if (!dest_lease_guard->StillValidFor(kTimeNeededForContainerDeletion)) { + return Status::IOError("Unable to replace empty container: '", dest.all, "'. ", + "Preparation for the operation took too long and a " + "container lease expired."); + } + try { + src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerDeletion); + src_container_client.DeleteIfExists(options); + src_lease_guard.Forget(); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to delete empty container '", Review Comment: ```suggestion return ExceptionToStatus(exception, "Failed to delete empty container: '", ``` ########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -942,6 +972,185 @@ FileInfo FileInfoFromBlob(std::string_view container, return info; } +/// \brief RAII-style guard for releasing a lease on a blob or container. +/// +/// The guard should be constructed right after a successful BlobLeaseClient::Acquire() +/// call. Use std::optional<LeaseGuard> to declare a guard in outer scope and construct it +/// later with std::optional::emplace(...). +/// +/// Leases expire automatically, but explicit release means concurrent clients or +/// ourselves when trying new operations on the same blob or container don't have +/// to wait for the lease to expire by itself. +/// +/// Learn more about leases at +/// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob +class LeaseGuard { + public: + using SteadyClock = std::chrono::steady_clock; + + private: + /// \brief The time when the lease expires or is broken. + /// + /// The lease is not guaranteed to be valid until this time, but it is guaranteed to + /// be expired after this time. In other words, this is an overestimation of + /// the true time_point. + SteadyClock::time_point break_or_expires_at_; + const std::unique_ptr<Blobs::BlobLeaseClient> lease_client_; + bool release_attempt_pending_ = true; + + /// \brief The latest known expiry time of a lease guarded by this class + /// that failed to be released or was forgotten by calling Forget(). + static std::atomic<SteadyClock::time_point> latest_known_expiry_time_; + + /// \brief The maximum lease duration supported by Azure Storage. + static constexpr std::chrono::seconds kMaxLeaseDuration{60}; + + public: + LeaseGuard(std::unique_ptr<Blobs::BlobLeaseClient> lease_client, + std::chrono::seconds lease_duration) + : break_or_expires_at_(SteadyClock::now() + + std::min(kMaxLeaseDuration, lease_duration)), + lease_client_(std::move(lease_client)) { + DCHECK(lease_duration <= kMaxLeaseDuration); + DCHECK(this->lease_client_); + } + + ARROW_DISALLOW_COPY_AND_ASSIGN(LeaseGuard); + + ~LeaseGuard() { + // No point in trying any error handling here other than the debug checking. The lease + // will eventually expire on the backend without any intervention from us (just much + // later than if we released it). + [[maybe_unused]] auto status = Release(); + ARROW_LOG(DEBUG) << status; + } + + bool PendingRelease() const { + return release_attempt_pending_ && SteadyClock::now() <= break_or_expires_at_; + } + + private: + Status DoRelease() { + DCHECK(release_attempt_pending_); + try { + lease_client_->Release(); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to release the ", + lease_client_->GetLeaseId(), " lease"); + } + return Status::OK(); + } + + public: + std::string LeaseId() const { return lease_client_->GetLeaseId(); } + + bool StillValidFor(SteadyClock::duration expected_time_left) const { + return SteadyClock::now() + expected_time_left < break_or_expires_at_; + } + + /// \brief Break the lease. + /// + /// The lease will stay in the "Breaking" state for break_period seconds or + /// less if the lease is expiring before that. + /// + /// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-container#outcomes-of-use-attempts-on-containers-by-lease-state + /// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob#outcomes-of-use-attempts-on-blobs-by-lease-state + Status Break(Azure::Nullable<std::chrono::seconds> break_period = {}) { + auto remaining_time_ms = [this]() { + const auto remaing_time = break_or_expires_at_ - SteadyClock::now(); + return std::chrono::duration_cast<std::chrono::milliseconds>(remaing_time); + }; +#ifndef NDEBUG + if (break_period.HasValue() && !StillValidFor(*break_period)) { + ARROW_LOG(WARNING) + << "Azure Storage: requested break_period (" + << break_period.ValueOr(std::chrono::seconds{0}).count() + << "s) is too long or lease duration is too short for all the operations " + "performed so far (lease expires in " + << remaining_time_ms().count() << "ms)"; + } +#endif + Blobs::BreakLeaseOptions options; + options.BreakPeriod = break_period; + try { + lease_client_->Break(options); + break_or_expires_at_ = + std::min(break_or_expires_at_, + SteadyClock::now() + break_period.ValueOr(std::chrono::seconds{0})); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to break the ", + lease_client_->GetLeaseId(), " lease expiring in ", + remaining_time_ms().count(), "ms"); + } + return Status::OK(); + } + + /// \brief Break the lease before deleting or renaming the resource. + /// + /// Calling this is recommended when the resource for which the lease was acquired is + /// about to be deleted as there is no way of releasing the lease after that, we can + /// only forget about it. The break_period should be a conservative estimate of the time + /// it takes to delete/rename the resource. + /// + /// If break_period is too small, the delete/rename will fail with a lease conflict, + /// and if it's too large the only consequence is that a lease on a non-existent + /// resource will remain in the "Breaking" state for a while blocking others + /// from recreating the resource. + void BreakBeforeDeletion(std::chrono::seconds break_period) { + ARROW_CHECK_OK(Break(break_period)); + } + + // These functions are marked ARROW_NOINLINE because they are called from + // multiple locations, but are not performance-critical. + Review Comment: ```suggestion ``` ########## cpp/src/arrow/filesystem/azurefs_test.cc: ########## @@ -854,6 +888,395 @@ class TestAzureFileSystem : public ::testing::Test { const auto directory_path = data.RandomDirectoryPath(rng_); ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false)); } + + private: + using StringMatcher = + ::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>; + + StringMatcher HasDirMoveToSubdirMessage(const std::string& src, + const std::string& dest) { + return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + src + + "' a sub-directory of itself."); + } + + StringMatcher HasCrossContainerNotImplementedMessage(const std::string& container_name, + const std::string& dest) { + return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest + + "' requires moving data between " + "containers, which is not implemented."); + } + + StringMatcher HasMissingParentDirMessage(const std::string& dest) { + return ::testing::HasSubstr("The parent directory of the destination path '" + dest + + "' does not exist."); + } + + /// \brief Expected POSIX semantics for the rename operation on multiple + /// scenarios. + /// + /// If the src doesn't exist, the error is always ENOENT, otherwise we are + /// left with the following combinations: + /// + /// 1. src's type + /// a. File + /// b. Directory + /// 2. dest's existence + /// a. NotFound + /// b. File + /// c. Directory + /// - empty + /// - non-empty + /// 3. src path has a trailing slash (or not) + /// 4. dest path has a trailing slash (or not) + /// + /// Limitations: this function doesn't consider paths so it assumes that the + /// paths don't lead requests for moves that would make the source a subdir of + /// the destination. + /// + /// \param paths_are_equal src and dest paths without trailing slashes are equal + /// \return std::nullopt if success is expected in the scenario or the errno + /// if failure is expected. + static std::optional<int> RenameSemantics(FileType src_type, bool src_trailing_slash, + FileType dest_type, bool dest_trailing_slash, + bool dest_is_empty_dir = false, + bool paths_are_equal = false) { + DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown); + DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory) + << "dest_is_empty_dir must imply dest_type == FileType::Directory"; + switch (src_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + return {ENOENT}; + case FileType::File: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + if (src_trailing_slash) { + return {ENOTDIR}; + } + if (dest_trailing_slash) { + // A slash on the destination path requires that it exists, + // so a confirmation that it's a directory can be performed. + return {ENOENT}; + } + return {}; + case FileType::File: + if (src_trailing_slash || dest_trailing_slash) { + return {ENOTDIR}; + } + // The existing file is replaced successfuly. + return {}; + case FileType::Directory: + if (src_trailing_slash) { + return {ENOTDIR}; + } + return EISDIR; + } + break; + case FileType::Directory: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + // We don't have to care about the slashes when the source is a directory. + return {}; + case FileType::File: + return {ENOTDIR}; + case FileType::Directory: + if (!paths_are_equal && !dest_is_empty_dir) { + return {ENOTEMPTY}; + } + return {}; + } + break; + } + Unreachable("Invalid parameters passed to RenameSemantics"); + } + + Status CheckExpectedErrno(const std::string& src, const std::string& dest, + std::optional<int> expected_errno, + const char* expected_errno_name, FileInfo* out_src_info) { + auto the_fs = fs(); + const bool src_trailing_slash = internal::HasTrailingSlash(src); + const bool dest_trailing_slash = internal::HasTrailingSlash(dest); + const auto src_path = std::string{internal::RemoveTrailingSlash(src)}; + const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)}; + ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path)); + bool dest_is_empty_dir = false; + if (dest_info.type() == FileType::Directory) { + FileSelector select; + select.base_dir = dest_path; + select.recursive = false; + // TODO(felipecrv): investigate why this can't be false + select.allow_not_found = true; + ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select)); + if (dest_contents.empty()) { + dest_is_empty_dir = true; + } + } + auto paths_are_equal = src_path == dest_path; + auto truly_expected_errno = + RenameSemantics(out_src_info->type(), src_trailing_slash, dest_info.type(), + dest_trailing_slash, dest_is_empty_dir, paths_are_equal); + if (truly_expected_errno != expected_errno) { + if (expected_errno.has_value()) { + return Status::Invalid("expected_errno=", expected_errno_name, "=", + *expected_errno, + " used in ASSERT_MOVE is incorrect. " + "POSIX semantics for this scenario require errno=", + strerror(truly_expected_errno.value_or(0))); + } else { + DCHECK(truly_expected_errno.has_value()); + return Status::Invalid( + "ASSERT_MOVE used to assert success in a scenario for which " + "POSIX semantics requires errno=", + strerror(*truly_expected_errno)); + } + } + return Status::OK(); + } + + void AssertAfterMove(const std::string& src, const std::string& dest, FileType type) { + if (internal::RemoveTrailingSlash(src) != internal::RemoveTrailingSlash(dest)) { + AssertFileInfo(fs(), src, FileType::NotFound); + } + AssertFileInfo(fs(), dest, type); + } + + static bool WithErrno(const Status& status, int expected_errno) { + auto* detail = status.detail().get(); + return detail && + arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == expected_errno; + } + + std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& src_info, + const std::string& src, + const std::string& dest, + int for_errno) { + switch (for_errno) { + case ENOENT: { + auto& path = src_info.type() == FileType::NotFound ? src : dest; + return ::testing::HasSubstr("Path does not exist '" + path + "'"); + } + case ENOTEMPTY: + return ::testing::HasSubstr("Directory not empty: '" + dest + "'"); + } + return std::nullopt; + } + +#define ASSERT_MOVE(src, dest, expected_errno) \ + do { \ + auto _src = (src); \ + auto _dest = (dest); \ + std::optional<int> _expected_errno = (expected_errno); \ + FileInfo _src_info; \ + ASSERT_OK( \ + CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, &_src_info)); \ + auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, _dest)); \ + if (_expected_errno.has_value()) { \ + if (WithErrno(_move_st, *_expected_errno)) { \ + /* If the Move failed, the source should remain unchanged. */ \ + AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, \ + _src_info.type()); \ + auto _message_matcher = \ + MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); \ + if (_message_matcher.has_value()) { \ + EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, _move_st); \ + } else { \ + SUCCEED(); \ + } \ + } else { \ + FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) \ + << "' did not fail with errno=" << #expected_errno; \ + } \ + } else { \ + if (!_move_st.ok()) { \ + FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) \ + << "' failed with " << _move_st.ToString(); \ + } else { \ + AssertAfterMove(_src, _dest, _src_info.type()); \ + } \ + } \ + } while (false) + +#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt) + + // Tests for Move() + + public: + void TestRenameContainer() { + EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv()); + auto data = SetUpPreexistingData(); + // Container exists, so renaming to the same name succeeds because it's a no-op. + ASSERT_MOVE_OK(data.container_name, data.container_name); + // Renaming a container that doesn't exist fails. + ASSERT_MOVE("missing-container", "missing-container", ENOENT); + ASSERT_MOVE("missing-container", data.container_name, ENOENT); + // Renaming a container to an existing non-empty container fails. + auto non_empty_container = PreexistingData::RandomContainerName(rng_); + auto non_empty_container_client = CreateContainer(non_empty_container); + CreateBlob(non_empty_container_client, "object1", PreexistingData::kLoremIpsum); + ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY); + // Renaming to an empty container fails to replace it + auto empty_container = PreexistingData::RandomContainerName(rng_); + auto empty_container_client = CreateContainer(empty_container); + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr("Unable to replace empty container: '" + empty_container + + "'"), + fs()->Move(data.container_name, empty_container)); + // Renaming to a non-existing container creates it + auto missing_container = PreexistingData::RandomContainerName(rng_); + AssertFileInfo(fs(), missing_container, FileType::NotFound); + if (env->backend() == AzureBackend::kAzurite) { + // Azurite returns a 201 Created for RenameBlobContainer, but the created + // container doesn't contain the blobs from the source container and + // the source container remains undeleted after the "rename". + } else { + // See Azure SDK issue/question: + // https://github.com/Azure/azure-sdk-for-cpp/issues/5262 + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr("The 'rename' operation is not supported on containers."), + fs()->Move(data.container_name, missing_container)); + // ASSERT_MOVE_OK(data.container_name, missing_container); + // AssertFileInfo(fs(), + // ConcatAbstractPath(missing_container, + // PreexistingData::kObjectName), FileType::File); + } + // Renaming to an empty container can work if the source is also empty + auto new_empty_container = PreexistingData::RandomContainerName(rng_); + auto new_empty_container_client = CreateContainer(new_empty_container); + ASSERT_MOVE_OK(empty_container, new_empty_container); + } + + void TestMoveContainerToPath() { + auto data = SetUpPreexistingData(); + ASSERT_MOVE("missing-container", data.ContainerPath("new-subdir"), ENOENT); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + HasDirMoveToSubdirMessage(data.container_name, data.ContainerPath("new-subdir")), + fs()->Move(data.container_name, data.ContainerPath("new-subdir"))); + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(data.container_name, + "a-container/new-subdir"), + fs()->Move(data.container_name, "a-container/new-subdir")); + } + + void TestCreateContainerFromPath() { + auto data = SetUpPreexistingData(); + auto missing_path = data.RandomDirectoryPath(rng_); + ASSERT_MOVE(missing_path, "new-container", ENOENT); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::HasSubstr("Creating files at '/' is not possible, only directories."), + fs()->Move(data.ObjectPath(), "new-file")); + auto src_dir_path = data.RandomDirectoryPath(rng_); + ASSERT_OK(fs()->CreateDir(src_dir_path, false)); + EXPECT_OK_AND_ASSIGN(auto src_dir_info, fs()->GetFileInfo(src_dir_path)); + EXPECT_EQ(src_dir_info.type(), FileType::Directory); + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(src_dir_path, "new-container"), + fs()->Move(src_dir_path, "new-container")); + } + + void TestMovePaths() { + Status st; + auto data = SetUpPreexistingData(); + // When source doesn't exist. + ASSERT_MOVE("missing-container/src-path", "a-container/dest-path", ENOENT); + auto missing_path1 = data.RandomDirectoryPath(rng_); + ASSERT_MOVE(missing_path1, "missing-container/path", ENOENT); + + // But when source exists... + if (!WithHierarchicalNamespace()) { + // ...and containers are different, we get an error message telling cross-container + // moves are not implemented. + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(data.ObjectPath(), + "missing-container/path"), + fs()->Move(data.ObjectPath(), "missing-container/path")); + GTEST_SKIP() + << "The rest of TestMovePaths is not implemented for non-HNS scenarios"; Review Comment: Can we just `return` here? Should we use `GTEST_SKIP()` here? We have some tests for non-HNS case. ########## cpp/src/arrow/filesystem/azurefs_test.cc: ########## @@ -854,6 +888,395 @@ class TestAzureFileSystem : public ::testing::Test { const auto directory_path = data.RandomDirectoryPath(rng_); ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false)); } + + private: + using StringMatcher = + ::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>; + + StringMatcher HasDirMoveToSubdirMessage(const std::string& src, + const std::string& dest) { + return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + src + + "' a sub-directory of itself."); + } + + StringMatcher HasCrossContainerNotImplementedMessage(const std::string& container_name, + const std::string& dest) { + return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest + + "' requires moving data between " + "containers, which is not implemented."); + } + + StringMatcher HasMissingParentDirMessage(const std::string& dest) { + return ::testing::HasSubstr("The parent directory of the destination path '" + dest + + "' does not exist."); + } + + /// \brief Expected POSIX semantics for the rename operation on multiple + /// scenarios. + /// + /// If the src doesn't exist, the error is always ENOENT, otherwise we are + /// left with the following combinations: + /// + /// 1. src's type + /// a. File + /// b. Directory + /// 2. dest's existence + /// a. NotFound + /// b. File + /// c. Directory + /// - empty + /// - non-empty + /// 3. src path has a trailing slash (or not) + /// 4. dest path has a trailing slash (or not) + /// + /// Limitations: this function doesn't consider paths so it assumes that the + /// paths don't lead requests for moves that would make the source a subdir of + /// the destination. + /// + /// \param paths_are_equal src and dest paths without trailing slashes are equal + /// \return std::nullopt if success is expected in the scenario or the errno + /// if failure is expected. + static std::optional<int> RenameSemantics(FileType src_type, bool src_trailing_slash, + FileType dest_type, bool dest_trailing_slash, + bool dest_is_empty_dir = false, + bool paths_are_equal = false) { + DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown); + DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory) + << "dest_is_empty_dir must imply dest_type == FileType::Directory"; + switch (src_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + return {ENOENT}; + case FileType::File: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + if (src_trailing_slash) { + return {ENOTDIR}; + } + if (dest_trailing_slash) { + // A slash on the destination path requires that it exists, + // so a confirmation that it's a directory can be performed. + return {ENOENT}; + } + return {}; + case FileType::File: + if (src_trailing_slash || dest_trailing_slash) { + return {ENOTDIR}; + } + // The existing file is replaced successfuly. + return {}; + case FileType::Directory: + if (src_trailing_slash) { + return {ENOTDIR}; + } + return EISDIR; + } + break; + case FileType::Directory: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + // We don't have to care about the slashes when the source is a directory. + return {}; + case FileType::File: + return {ENOTDIR}; + case FileType::Directory: + if (!paths_are_equal && !dest_is_empty_dir) { + return {ENOTEMPTY}; + } + return {}; + } + break; + } + Unreachable("Invalid parameters passed to RenameSemantics"); + } + + Status CheckExpectedErrno(const std::string& src, const std::string& dest, + std::optional<int> expected_errno, + const char* expected_errno_name, FileInfo* out_src_info) { + auto the_fs = fs(); + const bool src_trailing_slash = internal::HasTrailingSlash(src); + const bool dest_trailing_slash = internal::HasTrailingSlash(dest); + const auto src_path = std::string{internal::RemoveTrailingSlash(src)}; + const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)}; + ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path)); + bool dest_is_empty_dir = false; + if (dest_info.type() == FileType::Directory) { + FileSelector select; + select.base_dir = dest_path; + select.recursive = false; + // TODO(felipecrv): investigate why this can't be false + select.allow_not_found = true; + ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select)); + if (dest_contents.empty()) { + dest_is_empty_dir = true; + } + } + auto paths_are_equal = src_path == dest_path; + auto truly_expected_errno = + RenameSemantics(out_src_info->type(), src_trailing_slash, dest_info.type(), + dest_trailing_slash, dest_is_empty_dir, paths_are_equal); + if (truly_expected_errno != expected_errno) { + if (expected_errno.has_value()) { + return Status::Invalid("expected_errno=", expected_errno_name, "=", + *expected_errno, + " used in ASSERT_MOVE is incorrect. " + "POSIX semantics for this scenario require errno=", + strerror(truly_expected_errno.value_or(0))); + } else { + DCHECK(truly_expected_errno.has_value()); + return Status::Invalid( + "ASSERT_MOVE used to assert success in a scenario for which " + "POSIX semantics requires errno=", + strerror(*truly_expected_errno)); + } + } + return Status::OK(); + } + + void AssertAfterMove(const std::string& src, const std::string& dest, FileType type) { + if (internal::RemoveTrailingSlash(src) != internal::RemoveTrailingSlash(dest)) { + AssertFileInfo(fs(), src, FileType::NotFound); + } + AssertFileInfo(fs(), dest, type); + } + + static bool WithErrno(const Status& status, int expected_errno) { + auto* detail = status.detail().get(); + return detail && + arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == expected_errno; + } + + std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& src_info, + const std::string& src, + const std::string& dest, + int for_errno) { + switch (for_errno) { + case ENOENT: { + auto& path = src_info.type() == FileType::NotFound ? src : dest; + return ::testing::HasSubstr("Path does not exist '" + path + "'"); + } + case ENOTEMPTY: + return ::testing::HasSubstr("Directory not empty: '" + dest + "'"); + } + return std::nullopt; + } + +#define ASSERT_MOVE(src, dest, expected_errno) \ + do { \ + auto _src = (src); \ + auto _dest = (dest); \ + std::optional<int> _expected_errno = (expected_errno); \ + FileInfo _src_info; \ + ASSERT_OK( \ + CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, &_src_info)); \ + auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, _dest)); \ + if (_expected_errno.has_value()) { \ + if (WithErrno(_move_st, *_expected_errno)) { \ + /* If the Move failed, the source should remain unchanged. */ \ + AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, \ + _src_info.type()); \ + auto _message_matcher = \ + MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); \ + if (_message_matcher.has_value()) { \ + EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, _move_st); \ + } else { \ + SUCCEED(); \ + } \ + } else { \ + FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) \ + << "' did not fail with errno=" << #expected_errno; \ + } \ + } else { \ + if (!_move_st.ok()) { \ + FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) \ + << "' failed with " << _move_st.ToString(); \ + } else { \ + AssertAfterMove(_src, _dest, _src_info.type()); \ + } \ + } \ + } while (false) + +#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt) + + // Tests for Move() + + public: + void TestRenameContainer() { + EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv()); + auto data = SetUpPreexistingData(); + // Container exists, so renaming to the same name succeeds because it's a no-op. + ASSERT_MOVE_OK(data.container_name, data.container_name); + // Renaming a container that doesn't exist fails. + ASSERT_MOVE("missing-container", "missing-container", ENOENT); + ASSERT_MOVE("missing-container", data.container_name, ENOENT); + // Renaming a container to an existing non-empty container fails. + auto non_empty_container = PreexistingData::RandomContainerName(rng_); + auto non_empty_container_client = CreateContainer(non_empty_container); + CreateBlob(non_empty_container_client, "object1", PreexistingData::kLoremIpsum); + ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY); + // Renaming to an empty container fails to replace it + auto empty_container = PreexistingData::RandomContainerName(rng_); + auto empty_container_client = CreateContainer(empty_container); + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr("Unable to replace empty container: '" + empty_container + + "'"), + fs()->Move(data.container_name, empty_container)); + // Renaming to a non-existing container creates it + auto missing_container = PreexistingData::RandomContainerName(rng_); + AssertFileInfo(fs(), missing_container, FileType::NotFound); + if (env->backend() == AzureBackend::kAzurite) { + // Azurite returns a 201 Created for RenameBlobContainer, but the created + // container doesn't contain the blobs from the source container and + // the source container remains undeleted after the "rename". + } else { + // See Azure SDK issue/question: + // https://github.com/Azure/azure-sdk-for-cpp/issues/5262 + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr("The 'rename' operation is not supported on containers."), + fs()->Move(data.container_name, missing_container)); + // ASSERT_MOVE_OK(data.container_name, missing_container); + // AssertFileInfo(fs(), + // ConcatAbstractPath(missing_container, + // PreexistingData::kObjectName), FileType::File); + } + // Renaming to an empty container can work if the source is also empty + auto new_empty_container = PreexistingData::RandomContainerName(rng_); + auto new_empty_container_client = CreateContainer(new_empty_container); + ASSERT_MOVE_OK(empty_container, new_empty_container); + } + + void TestMoveContainerToPath() { + auto data = SetUpPreexistingData(); + ASSERT_MOVE("missing-container", data.ContainerPath("new-subdir"), ENOENT); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + HasDirMoveToSubdirMessage(data.container_name, data.ContainerPath("new-subdir")), + fs()->Move(data.container_name, data.ContainerPath("new-subdir"))); + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(data.container_name, + "a-container/new-subdir"), + fs()->Move(data.container_name, "a-container/new-subdir")); + } + + void TestCreateContainerFromPath() { + auto data = SetUpPreexistingData(); + auto missing_path = data.RandomDirectoryPath(rng_); + ASSERT_MOVE(missing_path, "new-container", ENOENT); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::HasSubstr("Creating files at '/' is not possible, only directories."), + fs()->Move(data.ObjectPath(), "new-file")); + auto src_dir_path = data.RandomDirectoryPath(rng_); + ASSERT_OK(fs()->CreateDir(src_dir_path, false)); + EXPECT_OK_AND_ASSIGN(auto src_dir_info, fs()->GetFileInfo(src_dir_path)); + EXPECT_EQ(src_dir_info.type(), FileType::Directory); + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(src_dir_path, "new-container"), + fs()->Move(src_dir_path, "new-container")); + } + + void TestMovePaths() { + Status st; + auto data = SetUpPreexistingData(); + // When source doesn't exist. + ASSERT_MOVE("missing-container/src-path", "a-container/dest-path", ENOENT); Review Comment: ```suggestion ASSERT_MOVE("missing-container/src-path", data.ContainerPath("dest-path"), ENOENT); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
