wgtmac commented on code in PR #687:
URL: https://github.com/apache/iceberg-cpp/pull/687#discussion_r3321997084


##########
src/iceberg/update/snapshot_update.cc:
##########
@@ -250,13 +251,17 @@ Result<SnapshotUpdate::ApplyResult> 
SnapshotUpdate::Apply() {
   }
 
   ICEBERG_ASSIGN_OR_RAISE(auto manifests, Apply(base(), parent_snapshot));
+  auto metadata_tasks = TaskGroup().SetExecutor(plan_executor_);
   for (auto& manifest : manifests) {
     if (manifest.added_snapshot_id != kInvalidSnapshotId) {
       continue;
     }
-    // TODO(xxx): read in parallel and cache enriched manifests for retries
-    ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), 
base()));
+    metadata_tasks.Submit([&manifest, this]() -> Status {

Review Comment:
   This one is nicely done: each task captures a distinct element of 
`manifests` (not an iterator or index), so there's no aliasing, and the 
`manifest_count_`/`attempt_` switch to atomics is backed by the 
`ConcurrentManifestPaths` test.



##########
src/iceberg/manifest/manifest_filter_manager.cc:
##########
@@ -401,19 +451,35 @@ Result<std::vector<ManifestFile>> 
ManifestFilterManager::FilterManifests(
   }
 
   bool trust_manifest_references = CanTrustManifestReferences(manifests);
-  manifest_evaluator_cache_.clear();
-  residual_evaluator_cache_.clear();
+  {
+    std::lock_guard lock(manifest_evaluator_cache_mutex_);
+    manifest_evaluator_cache_.clear();
+  }
+  {
+    std::lock_guard lock(residual_evaluator_cache_mutex_);
+    residual_evaluator_cache_.clear();
+  }
+
+  std::vector<FilterManifestResult> filter_results(manifests.size());
+  auto filter_tasks = TaskGroup().SetExecutor(executor_);
+  for (auto&& [manifest, result] : std::views::zip(manifests, filter_results)) 
{
+    filter_tasks.Submit([&]() -> Status {

Review Comment:
   This is safe today: the bindings refer to elements of `manifests` and 
`filter_results`, both of which outlive `Run()`. But capturing a loop's 
structured bindings by `[&]` into a deferred closure is an easy footgun. 
Capturing explicitly (e.g. `[&manifest, &result, ...]`) makes the intent 
clearer and harder to break later.



##########
src/iceberg/util/retry_util.h:
##########
@@ -69,76 +53,104 @@ struct ICEBERG_EXPORT RetryConfig {
   double scale_factor = 2.0;
 };
 
-/// \brief Utility class for running tasks with retry logic
-///
-/// When retries are enabled (`num_retries > 0`), callers must explicitly 
configure
-/// retry policy with `OnlyRetryOn(...)` or `StopRetryOn(...)`.
-class ICEBERG_EXPORT RetryRunner {
- public:
-  /// \brief Construct a RetryRunner with the given configuration
-  explicit RetryRunner(RetryConfig config = {}) : config_(std::move(config)) {}
+namespace detail {
 
-  /// \brief Specify error types that should trigger a retry.
-  ///
-  /// When set, only errors matching one of these kinds will be retried.
-  /// All other errors will stop retries immediately.
-  ///
-  /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
-  /// StopRetryOn is ignored.
-  RetryRunner& OnlyRetryOn(std::initializer_list<ErrorKind> error_kinds) {
-    retry_policy_mode_ = RetryPolicyMode::kOnlyRetryOn;
-    retry_error_kinds_ = std::vector<ErrorKind>(error_kinds);
-    return *this;
-  }
+class ICEBERG_EXPORT RetryRunnerBase {
+ protected:
+  explicit RetryRunnerBase(RetryConfig config) : config_(std::move(config)) {}
 
-  /// \brief Specify a single error type that should trigger a retry.
-  ///
-  /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
-  /// StopRetryOn is ignored.
-  RetryRunner& OnlyRetryOn(ErrorKind error_kind) { return 
OnlyRetryOn({error_kind}); }
+  using Clock = std::chrono::steady_clock;
+  using Duration = std::chrono::milliseconds;
+  using TimePoint = Clock::time_point;
 
-  /// \brief Specify error types that should stop retries immediately.
-  ///
-  /// When set, errors matching one of these kinds will not be retried.
-  /// All other errors will be retried.
-  ///
-  /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
-  /// StopRetryOn is ignored.
-  RetryRunner& StopRetryOn(std::initializer_list<ErrorKind> error_kinds) {
-    if (retry_policy_mode_ == RetryPolicyMode::kOnlyRetryOn) {
-      return *this;
-    }
+  /// \brief Validate retry counts and timing bounds.
+  Status ValidateConfig() const;
+  std::optional<TimePoint> ComputeDeadline() const;
+  bool HasTimedOut(const std::optional<TimePoint>& deadline) const;
+  std::optional<Duration> RetryDelayWithinBudget(
+      int32_t attempt, const std::optional<TimePoint>& deadline) const;
+  bool WaitForNextAttempt(int32_t attempt,
+                          const std::optional<TimePoint>& deadline) const;
+  /// \brief Calculate delay with exponential backoff and jitter
+  int32_t CalculateDelay(int32_t attempt) const;
+
+  RetryConfig config_;
+};
+
+}  // namespace detail
+
+namespace retry {
+
+enum class RetryPolicyMode {
+  kNoRetry,
+  kOnlyRetryOn,
+  kStopRetryOn,
+};
 
-    retry_policy_mode_ = RetryPolicyMode::kStopRetryOn;
-    retry_error_kinds_ = std::vector<ErrorKind>(error_kinds);
-    return *this;
+template <RetryPolicyMode Mode, ErrorKind... Kinds>
+struct RetryPolicy {
+  static_assert(Mode != RetryPolicyMode::kNoRetry || sizeof...(Kinds) == 0,
+                "NoRetry must not include error kinds");
+  static_assert(Mode == RetryPolicyMode::kNoRetry || sizeof...(Kinds) > 0,
+                "RetryPolicy must include at least one error kind");
+
+  static constexpr RetryPolicyMode kMode = Mode;
+  static constexpr bool kEnabled = Mode != RetryPolicyMode::kNoRetry;
+
+  static constexpr bool ShouldRetry(ErrorKind kind) {
+    if constexpr (Mode == RetryPolicyMode::kNoRetry) {
+      return false;
+    } else if constexpr (Mode == RetryPolicyMode::kOnlyRetryOn) {
+      return ((kind == Kinds) || ...);
+    } else {
+      return !((kind == Kinds) || ...);
+    }
   }
+};
 
-  /// \brief Specify a single error type that should stop retries immediately.
-  ///
-  /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
-  /// StopRetryOn is ignored.
-  RetryRunner& StopRetryOn(ErrorKind error_kind) { return 
StopRetryOn({error_kind}); }
+using NoRetry = RetryPolicy<RetryPolicyMode::kNoRetry>;

Review Comment:
   Note this drops a couple of runtime behaviors the old API had: `OnlyRetryOn` 
taking precedence over `StopRetryOn`, and the "policy must be configured" 
runtime error. They're now compile-time `static_assert`s plus `NoRetry`. The 
only caller (`MakeCommitRetryRunner`) is updated so it's functionally fine, but 
it's a user-visible tightening worth calling out in the PR description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to