felipecrv commented on code in PR #39904:
URL: https://github.com/apache/arrow/pull/39904#discussion_r1476091701


##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -942,6 +972,183 @@ FileInfo FileInfoFromBlob(std::string_view container,
   return info;
 }
 
+/// \brief RAII-style guard for releasing a lease on a blob or container.
+///
+/// The guard should be constructed right after a successful 
BlobLeaseClient::Acquire()
+/// call. Use std::optional<LeaseGuard> to declare a guard in outer scope and 
construct it
+/// later with std::optional::emplace(...).
+///
+/// Leases expire automatically, but explicit release means concurrent clients 
or
+/// ourselves when trying new operations on the same blob or container don't 
have
+/// to wait for the lease to expire by itself.
+///
+/// Learn more about leases at
+/// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob
+class LeaseGuard {
+ public:
+  using SteadyClock = std::chrono::steady_clock;
+
+ private:
+  /// \brief The time when the lease expires or is broken.
+  ///
+  /// The lease is not guaranteed to be valid until this time, but it is 
guaranteed to
+  /// be expired after this time. In other words, this is an overestimation of
+  /// the true time_point.
+  SteadyClock::time_point break_or_expires_at_;
+  const std::unique_ptr<Blobs::BlobLeaseClient> lease_client_;
+  bool release_attempt_pending_ = true;
+
+  /// \brief The latest known expiry time of a lease guarded by this class
+  /// that failed to be released or was forgotten by calling Forget().
+  static std::atomic<SteadyClock::time_point> latest_known_expiry_time_;
+
+  /// \brief The maximum lease duration supported by Azure Storage.
+  static constexpr std::chrono::seconds kMaxLeaseDuration{60};
+
+ public:
+  LeaseGuard(std::unique_ptr<Blobs::BlobLeaseClient> lease_client,
+             std::chrono::seconds lease_duration)
+      : break_or_expires_at_(SteadyClock::now() +
+                             std::min(kMaxLeaseDuration, lease_duration)),
+        lease_client_(std::move(lease_client)) {
+    DCHECK(lease_duration <= kMaxLeaseDuration);
+    DCHECK(this->lease_client_);
+  }
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(LeaseGuard);
+
+  ~LeaseGuard() {
+    // No point in trying any error handling here other than the debug 
checking. The lease
+    // will eventually expire on the backend without any intervention from us 
(just much
+    // later than if we released it).
+    [[maybe_unused]] auto status = Release();
+    ARROW_LOG(DEBUG) << status;
+  }
+
+  bool PendingRelease() const {
+    return release_attempt_pending_ && SteadyClock::now() <= 
break_or_expires_at_;
+  }
+
+ private:
+  Status DoRelease() {
+    DCHECK(release_attempt_pending_);
+    try {
+      lease_client_->Release();
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(exception, "Failed to release the ",
+                               lease_client_->GetLeaseId(), " lease on '",
+                               lease_client_->GetUrl(), "'");
+    }
+    return Status::OK();
+  }
+
+ public:
+  std::string LeaseId() const { return lease_client_->GetLeaseId(); }
+
+  bool StillValidFor(SteadyClock::duration expected_time_left) const {
+    return SteadyClock::now() + expected_time_left < break_or_expires_at_;
+  }
+
+  /// \brief Break the lease.
+  ///
+  /// The lease will stay in the "Breaking" state for break_period seconds or
+  /// less if the lease is expiring before that.
+  ///
+  /// 
https://learn.microsoft.com/en-us/rest/api/storageservices/lease-container#outcomes-of-use-attempts-on-containers-by-lease-state
+  /// 
https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob#outcomes-of-use-attempts-on-blobs-by-lease-state
+  Status Break(Azure::Nullable<std::chrono::seconds> break_period = {}) {
+    auto remaining_time_ms = [this]() {
+      const auto remaing_time = break_or_expires_at_ - SteadyClock::now();
+      return 
std::chrono::duration_cast<std::chrono::milliseconds>(remaing_time);
+    };
+#ifndef NDEBUG
+    if (break_period.HasValue() && !StillValidFor(*break_period)) {
+      ARROW_LOG(WARNING)
+          << "Azure Storage: requested break_period ("
+          << break_period.ValueOr(std::chrono::seconds{0}).count()
+          << "s) is too long or lease duration is too short for all the 
operations "
+             "performed so far (lease expires in "
+          << remaining_time_ms().count() << "ms)";
+    }
+#endif
+    Blobs::BreakLeaseOptions options;
+    options.BreakPeriod = break_period;
+    try {
+      lease_client_->Break(options);
+      break_or_expires_at_ =
+          std::min(break_or_expires_at_,
+                   SteadyClock::now() + 
break_period.ValueOr(std::chrono::seconds{0}));
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(
+          exception, "Failed to break the ", lease_client_->GetLeaseId(), " 
lease on '",
+          lease_client_->GetUrl(), "' expiring in ", 
remaining_time_ms().count(), "ms");
+    }
+    return Status::OK();
+  }
+
+  /// \brief Break the lease before deleting or renaming the resource.
+  ///
+  /// Calling this is recommended when the resource for which the lease was 
acquired is
+  /// about to be deleted as there is no way of releasing the lease after 
that, we can
+  /// only forget about it. The break_period should be a conservative estimate 
of the time
+  /// it takes to delete/rename the resource.
+  ///
+  /// If break_period is too small, the delete/rename will fail with a lease 
conflict,
+  /// and if it's too large the only consequence is that a lease on a 
non-existent
+  /// resource will remain in the "Breaking" state for a while blocking others
+  /// from recreating the resource.
+  void BreakBeforeDeletion(std::chrono::seconds break_period) {
+    ARROW_CHECK_OK(Break(break_period));
+  }
+
+  ARROW_NOINLINE Status Release() {
+    if (!PendingRelease()) {
+      return Status::OK();
+    }
+    auto status = DoRelease();
+    if (!status.ok()) {
+      Forget();
+      return status;
+    }
+    release_attempt_pending_ = false;
+    return Status::OK();
+  }
+
+  /// \brief Prevent any release attempts in the destructor.
+  ///
+  /// When it's known they would certainly fail.
+  /// \see LeaseGuard::BreakBeforeDeletion()
+  ARROW_NOINLINE void Forget() {
+    if (!PendingRelease()) {
+      release_attempt_pending_ = false;
+      return;
+    }
+    release_attempt_pending_ = false;
+    // Remember the latest known expiry time so we can gracefully handle lease
+    // acquisition failures by waiting until the latest forgotten lease.
+    auto latest = latest_known_expiry_time_.load(std::memory_order_relaxed);
+    while (
+        latest < break_or_expires_at_ &&
+        !latest_known_expiry_time_.compare_exchange_weak(latest, 
break_or_expires_at_)) {
+    }
+    DCHECK_GE(latest_known_expiry_time_.load(), break_or_expires_at_);

Review Comment:
   It's safe because `latest_known_expiry_time_` monotonically increases it 
never goes down. So even if it's changed, the `[G]reater than or [E]qual` will 
always work.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -942,6 +972,183 @@ FileInfo FileInfoFromBlob(std::string_view container,
   return info;
 }
 
+/// \brief RAII-style guard for releasing a lease on a blob or container.
+///
+/// The guard should be constructed right after a successful 
BlobLeaseClient::Acquire()
+/// call. Use std::optional<LeaseGuard> to declare a guard in outer scope and 
construct it
+/// later with std::optional::emplace(...).
+///
+/// Leases expire automatically, but explicit release means concurrent clients 
or
+/// ourselves when trying new operations on the same blob or container don't 
have
+/// to wait for the lease to expire by itself.
+///
+/// Learn more about leases at
+/// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob
+class LeaseGuard {
+ public:
+  using SteadyClock = std::chrono::steady_clock;
+
+ private:
+  /// \brief The time when the lease expires or is broken.
+  ///
+  /// The lease is not guaranteed to be valid until this time, but it is 
guaranteed to
+  /// be expired after this time. In other words, this is an overestimation of
+  /// the true time_point.
+  SteadyClock::time_point break_or_expires_at_;
+  const std::unique_ptr<Blobs::BlobLeaseClient> lease_client_;
+  bool release_attempt_pending_ = true;
+
+  /// \brief The latest known expiry time of a lease guarded by this class
+  /// that failed to be released or was forgotten by calling Forget().
+  static std::atomic<SteadyClock::time_point> latest_known_expiry_time_;
+
+  /// \brief The maximum lease duration supported by Azure Storage.
+  static constexpr std::chrono::seconds kMaxLeaseDuration{60};
+
+ public:
+  LeaseGuard(std::unique_ptr<Blobs::BlobLeaseClient> lease_client,
+             std::chrono::seconds lease_duration)
+      : break_or_expires_at_(SteadyClock::now() +
+                             std::min(kMaxLeaseDuration, lease_duration)),
+        lease_client_(std::move(lease_client)) {
+    DCHECK(lease_duration <= kMaxLeaseDuration);
+    DCHECK(this->lease_client_);
+  }
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(LeaseGuard);
+
+  ~LeaseGuard() {
+    // No point in trying any error handling here other than the debug 
checking. The lease
+    // will eventually expire on the backend without any intervention from us 
(just much
+    // later than if we released it).
+    [[maybe_unused]] auto status = Release();
+    ARROW_LOG(DEBUG) << status;
+  }
+
+  bool PendingRelease() const {
+    return release_attempt_pending_ && SteadyClock::now() <= 
break_or_expires_at_;
+  }
+
+ private:
+  Status DoRelease() {
+    DCHECK(release_attempt_pending_);
+    try {
+      lease_client_->Release();
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(exception, "Failed to release the ",
+                               lease_client_->GetLeaseId(), " lease on '",
+                               lease_client_->GetUrl(), "'");
+    }
+    return Status::OK();
+  }
+
+ public:
+  std::string LeaseId() const { return lease_client_->GetLeaseId(); }
+
+  bool StillValidFor(SteadyClock::duration expected_time_left) const {
+    return SteadyClock::now() + expected_time_left < break_or_expires_at_;
+  }
+
+  /// \brief Break the lease.
+  ///
+  /// The lease will stay in the "Breaking" state for break_period seconds or
+  /// less if the lease is expiring before that.
+  ///
+  /// 
https://learn.microsoft.com/en-us/rest/api/storageservices/lease-container#outcomes-of-use-attempts-on-containers-by-lease-state
+  /// 
https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob#outcomes-of-use-attempts-on-blobs-by-lease-state
+  Status Break(Azure::Nullable<std::chrono::seconds> break_period = {}) {
+    auto remaining_time_ms = [this]() {
+      const auto remaing_time = break_or_expires_at_ - SteadyClock::now();
+      return 
std::chrono::duration_cast<std::chrono::milliseconds>(remaing_time);
+    };
+#ifndef NDEBUG
+    if (break_period.HasValue() && !StillValidFor(*break_period)) {
+      ARROW_LOG(WARNING)
+          << "Azure Storage: requested break_period ("
+          << break_period.ValueOr(std::chrono::seconds{0}).count()
+          << "s) is too long or lease duration is too short for all the 
operations "
+             "performed so far (lease expires in "
+          << remaining_time_ms().count() << "ms)";
+    }
+#endif
+    Blobs::BreakLeaseOptions options;
+    options.BreakPeriod = break_period;
+    try {
+      lease_client_->Break(options);
+      break_or_expires_at_ =
+          std::min(break_or_expires_at_,
+                   SteadyClock::now() + 
break_period.ValueOr(std::chrono::seconds{0}));
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(
+          exception, "Failed to break the ", lease_client_->GetLeaseId(), " 
lease on '",
+          lease_client_->GetUrl(), "' expiring in ", 
remaining_time_ms().count(), "ms");
+    }
+    return Status::OK();
+  }
+
+  /// \brief Break the lease before deleting or renaming the resource.
+  ///
+  /// Calling this is recommended when the resource for which the lease was 
acquired is
+  /// about to be deleted as there is no way of releasing the lease after 
that, we can
+  /// only forget about it. The break_period should be a conservative estimate 
of the time
+  /// it takes to delete/rename the resource.
+  ///
+  /// If break_period is too small, the delete/rename will fail with a lease 
conflict,
+  /// and if it's too large the only consequence is that a lease on a 
non-existent
+  /// resource will remain in the "Breaking" state for a while blocking others
+  /// from recreating the resource.
+  void BreakBeforeDeletion(std::chrono::seconds break_period) {
+    ARROW_CHECK_OK(Break(break_period));
+  }
+
+  ARROW_NOINLINE Status Release() {
+    if (!PendingRelease()) {
+      return Status::OK();
+    }
+    auto status = DoRelease();
+    if (!status.ok()) {
+      Forget();
+      return status;
+    }
+    release_attempt_pending_ = false;
+    return Status::OK();
+  }
+
+  /// \brief Prevent any release attempts in the destructor.
+  ///
+  /// When it's known they would certainly fail.
+  /// \see LeaseGuard::BreakBeforeDeletion()
+  ARROW_NOINLINE void Forget() {
+    if (!PendingRelease()) {
+      release_attempt_pending_ = false;
+      return;
+    }
+    release_attempt_pending_ = false;
+    // Remember the latest known expiry time so we can gracefully handle lease
+    // acquisition failures by waiting until the latest forgotten lease.
+    auto latest = latest_known_expiry_time_.load(std::memory_order_relaxed);
+    while (
+        latest < break_or_expires_at_ &&
+        !latest_known_expiry_time_.compare_exchange_weak(latest, 
break_or_expires_at_)) {
+    }
+    DCHECK_GE(latest_known_expiry_time_.load(), break_or_expires_at_);

Review Comment:
   It's safe because `latest_known_expiry_time_` monotonically increases (it 
never goes down). So even if it's changed, the `[G]reater than or [E]qual` will 
always work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to