zeroshade commented on code in PR #39904: URL: https://github.com/apache/arrow/pull/39904#discussion_r1477673813
########## cpp/src/arrow/filesystem/azurefs.cc: ########## @@ -942,6 +972,183 @@ FileInfo FileInfoFromBlob(std::string_view container, return info; } +/// \brief RAII-style guard for releasing a lease on a blob or container. +/// +/// The guard should be constructed right after a successful BlobLeaseClient::Acquire() +/// call. Use std::optional<LeaseGuard> to declare a guard in outer scope and construct it +/// later with std::optional::emplace(...). +/// +/// Leases expire automatically, but explicit release means concurrent clients or +/// ourselves when trying new operations on the same blob or container don't have +/// to wait for the lease to expire by itself. +/// +/// Learn more about leases at +/// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob +class LeaseGuard { + public: + using SteadyClock = std::chrono::steady_clock; + + private: + /// \brief The time when the lease expires or is broken. + /// + /// The lease is not guaranteed to be valid until this time, but it is guaranteed to + /// be expired after this time. In other words, this is an overestimation of + /// the true time_point. + SteadyClock::time_point break_or_expires_at_; + const std::unique_ptr<Blobs::BlobLeaseClient> lease_client_; + bool release_attempt_pending_ = true; + + /// \brief The latest known expiry time of a lease guarded by this class + /// that failed to be released or was forgotten by calling Forget(). + static std::atomic<SteadyClock::time_point> latest_known_expiry_time_; + + /// \brief The maximum lease duration supported by Azure Storage. + static constexpr std::chrono::seconds kMaxLeaseDuration{60}; + + public: + LeaseGuard(std::unique_ptr<Blobs::BlobLeaseClient> lease_client, + std::chrono::seconds lease_duration) + : break_or_expires_at_(SteadyClock::now() + + std::min(kMaxLeaseDuration, lease_duration)), + lease_client_(std::move(lease_client)) { + DCHECK(lease_duration <= kMaxLeaseDuration); + DCHECK(this->lease_client_); + } + + ARROW_DISALLOW_COPY_AND_ASSIGN(LeaseGuard); + + ~LeaseGuard() { + // No point in trying any error handling here other than the debug checking. The lease + // will eventually expire on the backend without any intervention from us (just much + // later than if we released it). + [[maybe_unused]] auto status = Release(); + ARROW_LOG(DEBUG) << status; + } + + bool PendingRelease() const { + return release_attempt_pending_ && SteadyClock::now() <= break_or_expires_at_; + } + + private: + Status DoRelease() { + DCHECK(release_attempt_pending_); + try { + lease_client_->Release(); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to release the ", + lease_client_->GetLeaseId(), " lease on '", + lease_client_->GetUrl(), "'"); + } + return Status::OK(); + } + + public: + std::string LeaseId() const { return lease_client_->GetLeaseId(); } + + bool StillValidFor(SteadyClock::duration expected_time_left) const { + return SteadyClock::now() + expected_time_left < break_or_expires_at_; + } + + /// \brief Break the lease. + /// + /// The lease will stay in the "Breaking" state for break_period seconds or + /// less if the lease is expiring before that. + /// + /// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-container#outcomes-of-use-attempts-on-containers-by-lease-state + /// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob#outcomes-of-use-attempts-on-blobs-by-lease-state + Status Break(Azure::Nullable<std::chrono::seconds> break_period = {}) { + auto remaining_time_ms = [this]() { + const auto remaing_time = break_or_expires_at_ - SteadyClock::now(); + return std::chrono::duration_cast<std::chrono::milliseconds>(remaing_time); + }; +#ifndef NDEBUG + if (break_period.HasValue() && !StillValidFor(*break_period)) { + ARROW_LOG(WARNING) + << "Azure Storage: requested break_period (" + << break_period.ValueOr(std::chrono::seconds{0}).count() + << "s) is too long or lease duration is too short for all the operations " + "performed so far (lease expires in " + << remaining_time_ms().count() << "ms)"; + } +#endif + Blobs::BreakLeaseOptions options; + options.BreakPeriod = break_period; + try { + lease_client_->Break(options); + break_or_expires_at_ = + std::min(break_or_expires_at_, + SteadyClock::now() + break_period.ValueOr(std::chrono::seconds{0})); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus( + exception, "Failed to break the ", lease_client_->GetLeaseId(), " lease on '", + lease_client_->GetUrl(), "' expiring in ", remaining_time_ms().count(), "ms"); + } + return Status::OK(); + } + + /// \brief Break the lease before deleting or renaming the resource. + /// + /// Calling this is recommended when the resource for which the lease was acquired is + /// about to be deleted as there is no way of releasing the lease after that, we can + /// only forget about it. The break_period should be a conservative estimate of the time + /// it takes to delete/rename the resource. + /// + /// If break_period is too small, the delete/rename will fail with a lease conflict, + /// and if it's too large the only consequence is that a lease on a non-existent + /// resource will remain in the "Breaking" state for a while blocking others + /// from recreating the resource. + void BreakBeforeDeletion(std::chrono::seconds break_period) { + ARROW_CHECK_OK(Break(break_period)); + } + + ARROW_NOINLINE Status Release() { + if (!PendingRelease()) { + return Status::OK(); + } + auto status = DoRelease(); + if (!status.ok()) { + Forget(); + return status; + } + release_attempt_pending_ = false; + return Status::OK(); + } + + /// \brief Prevent any release attempts in the destructor. + /// + /// When it's known they would certainly fail. + /// \see LeaseGuard::BreakBeforeDeletion() + ARROW_NOINLINE void Forget() { + if (!PendingRelease()) { + release_attempt_pending_ = false; + return; + } + release_attempt_pending_ = false; + // Remember the latest known expiry time so we can gracefully handle lease + // acquisition failures by waiting until the latest forgotten lease. + auto latest = latest_known_expiry_time_.load(std::memory_order_relaxed); + while ( + latest < break_or_expires_at_ && + !latest_known_expiry_time_.compare_exchange_weak(latest, break_or_expires_at_)) { + } + DCHECK_GE(latest_known_expiry_time_.load(), break_or_expires_at_); + } + + ARROW_NOINLINE static void WaitUntilLatestKnownExpiryTime() { Review Comment: Can we add a comment for why we're using noinline? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
