This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ff422295 refactor(util): make retry policy explicit and deterministic 
(#633)
ff422295 is described below

commit ff4222958eb47ab41dc6603e945318c8f3857316
Author: Gang Wu <[email protected]>
AuthorDate: Wed May 6 16:04:16 2026 +0800

    refactor(util): make retry policy explicit and deterministic (#633)
---
 src/iceberg/CMakeLists.txt             |   1 +
 src/iceberg/meson.build                |   1 +
 src/iceberg/test/retry_util_test.cc    | 399 ++++++++++++++++++++++++++++++---
 src/iceberg/util/retry_util.cc         | 186 +++++++++++++++
 src/iceberg/util/retry_util.h          | 164 +++++++-------
 src/iceberg/util/retry_util_internal.h |  61 +++++
 6 files changed, 689 insertions(+), 123 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 6d190cb6..02099f6b 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -104,6 +104,7 @@ set(ICEBERG_SOURCES
     util/gzip_internal.cc
     util/murmurhash3_internal.cc
     util/property_util.cc
+    util/retry_util.cc
     util/snapshot_util.cc
     util/string_util.cc
     util/struct_like_set.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 17ed723f..41a5c2dd 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -126,6 +126,7 @@ iceberg_sources = files(
     'util/gzip_internal.cc',
     'util/murmurhash3_internal.cc',
     'util/property_util.cc',
+    'util/retry_util.cc',
     'util/snapshot_util.cc',
     'util/string_util.cc',
     'util/struct_like_set.cc',
diff --git a/src/iceberg/test/retry_util_test.cc 
b/src/iceberg/test/retry_util_test.cc
index 05c7cb0e..ead22191 100644
--- a/src/iceberg/test/retry_util_test.cc
+++ b/src/iceberg/test/retry_util_test.cc
@@ -19,15 +19,73 @@
 
 #include "iceberg/util/retry_util.h"
 
-#include <chrono>
-#include <thread>
+#include <concepts>
+#include <limits>
+#include <vector>
 
 #include <gtest/gtest.h>
 
 #include "iceberg/result.h"
 #include "iceberg/test/matchers.h"
+#include "iceberg/util/retry_util_internal.h"
 
 namespace iceberg {
+namespace {
+
+struct ResultReturningTask {
+  Result<int> operator()() const { return 1; }
+};
+
+struct NonResultReturningTask {
+  int operator()() const { return 1; }
+};
+
+static_assert(detail::RetryTask<ResultReturningTask>);
+static_assert(!detail::RetryTask<NonResultReturningTask>);
+static_assert(requires(RetryRunner runner, ResultReturningTask task) {
+  { runner.Run(task) } -> std::same_as<Result<int>>;
+});
+
+class FakeRetryEnvironment {
+ public:
+  using Duration = RetryTestHooks::Duration;
+  using TimePoint = RetryTestHooks::TimePoint;
+
+  FakeRetryEnvironment() {
+    hooks_.now = [this]() { return now_; };
+    hooks_.sleep_for = [this](Duration duration) {
+      sleep_durations_.push_back(duration);
+      now_ += duration;
+    };
+    hooks_.jitter = [this](int32_t base_delay_ms) {
+      observed_base_delays_ms_.push_back(base_delay_ms);
+      return base_delay_ms + jitter_offset_ms_;
+    };
+  }
+
+  void Advance(Duration duration) { now_ += duration; }
+
+  void SetJitterOffsetMs(int32_t jitter_offset_ms) {
+    jitter_offset_ms_ = jitter_offset_ms;
+  }
+
+  const RetryTestHooks& hooks() const { return hooks_; }
+
+  const std::vector<Duration>& sleep_durations() const { return 
sleep_durations_; }
+
+  const std::vector<int32_t>& observed_base_delays_ms() const {
+    return observed_base_delays_ms_;
+  }
+
+ private:
+  RetryTestHooks hooks_;
+  TimePoint now_{};
+  int32_t jitter_offset_ms_ = 0;
+  std::vector<Duration> sleep_durations_;
+  std::vector<int32_t> observed_base_delays_ms_;
+};
+
+}  // namespace
 
 TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
   int call_count = 0;
@@ -37,6 +95,7 @@ TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
                                         .min_wait_ms = 1,
                                         .max_wait_ms = 10,
                                         .total_timeout_ms = 5000})
+                    .OnlyRetryOn(ErrorKind::kCommitFailed)
                     .Run(
                         [&]() -> Result<int> {
                           ++call_count;
@@ -58,6 +117,7 @@ TEST(RetryRunnerTest, RetryOnceThenSucceed) {
                                         .min_wait_ms = 1,
                                         .max_wait_ms = 10,
                                         .total_timeout_ms = 5000})
+                    .OnlyRetryOn(ErrorKind::kCommitFailed)
                     .Run(
                         [&]() -> Result<int> {
                           ++call_count;
@@ -82,6 +142,7 @@ TEST(RetryRunnerTest, MaxAttemptsExhausted) {
                                         .min_wait_ms = 1,
                                         .max_wait_ms = 10,
                                         .total_timeout_ms = 5000})
+                    .OnlyRetryOn(ErrorKind::kCommitFailed)
                     .Run(
                         [&]() -> Result<int> {
                           ++call_count;
@@ -140,6 +201,32 @@ TEST(RetryRunnerTest, OnlyRetryOnMatchingError) {
   EXPECT_EQ(attempts, 3);
 }
 
+TEST(RetryRunnerTest, OnlyRetryOnTakesPrecedenceOverStopRetryOn) {
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = 2,
+                                        .min_wait_ms = 1,
+                                        .max_wait_ms = 10,
+                                        .total_timeout_ms = 5000})
+                    .OnlyRetryOn(ErrorKind::kCommitFailed)
+                    .StopRetryOn(ErrorKind::kCommitFailed)
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          if (call_count == 1) {
+                            return CommitFailed("transient");
+                          }
+                          return 100;
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsOk());
+  EXPECT_EQ(*result, 100);
+  EXPECT_EQ(call_count, 2);
+  EXPECT_EQ(attempts, 2);
+}
+
 TEST(RetryRunnerTest, StopRetryOnMatchingError) {
   int call_count = 0;
   int32_t attempts = 0;
@@ -148,7 +235,7 @@ TEST(RetryRunnerTest, StopRetryOnMatchingError) {
                                         .min_wait_ms = 1,
                                         .max_wait_ms = 10,
                                         .total_timeout_ms = 5000})
-                    .StopRetryOn({ErrorKind::kCommitStateUnknown})
+                    .StopRetryOn(ErrorKind::kCommitStateUnknown)
                     .Run(
                         [&]() -> Result<int> {
                           ++call_count;
@@ -161,14 +248,40 @@ TEST(RetryRunnerTest, StopRetryOnMatchingError) {
   EXPECT_EQ(attempts, 1);
 }
 
-TEST(RetryRunnerTest, ZeroRetries) {
+TEST(RetryRunnerTest, StopRetryOnNonMatchingErrorAllowsRetry) {
   int call_count = 0;
   int32_t attempts = 0;
 
-  auto result = RetryRunner(RetryConfig{.num_retries = 0,
+  auto result = RetryRunner(RetryConfig{.num_retries = 2,
                                         .min_wait_ms = 1,
                                         .max_wait_ms = 10,
                                         .total_timeout_ms = 5000})
+                    .StopRetryOn({ErrorKind::kCommitStateUnknown})
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          if (call_count == 1) {
+                            return CommitFailed("retryable");
+                          }
+                          return 88;
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsOk());
+  EXPECT_EQ(*result, 88);
+  EXPECT_EQ(call_count, 2);
+  EXPECT_EQ(attempts, 2);
+}
+
+TEST(RetryRunnerTest, ZeroRetriesAllowsUnsetPolicyAndSkipsBackoffValidation) {
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = 0,
+                                        .min_wait_ms = 0,
+                                        .max_wait_ms = 0,
+                                        .total_timeout_ms = 5000,
+                                        .scale_factor = 0.5})
                     .Run(
                         [&]() -> Result<int> {
                           ++call_count;
@@ -181,7 +294,159 @@ TEST(RetryRunnerTest, ZeroRetries) {
   EXPECT_EQ(attempts, 1);
 }
 
+TEST(RetryRunnerTest, NegativeRetriesFailsBeforeTaskRuns) {
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = -1,
+                                        .min_wait_ms = 1,
+                                        .max_wait_ms = 10,
+                                        .total_timeout_ms = 5000})
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          return 1;
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(result, HasErrorMessage("num_retries must be non-negative"));
+  EXPECT_EQ(call_count, 0);
+  EXPECT_EQ(attempts, 0);
+}
+
+TEST(RetryRunnerTest, InvalidBackoffConfigFailsBeforeTaskRuns) {
+  struct InvalidConfigCase {
+    RetryConfig config;
+    const char* expected_message;
+  };
+
+  const std::vector<InvalidConfigCase> test_cases = {
+      {.config = RetryConfig{.num_retries = 
std::numeric_limits<int32_t>::max(),
+                             .min_wait_ms = 1,
+                             .max_wait_ms = 10,
+                             .total_timeout_ms = 5000},
+       .expected_message = "num_retries is too large"},
+      {.config = RetryConfig{.num_retries = 1,
+                             .min_wait_ms = 0,
+                             .max_wait_ms = 10,
+                             .total_timeout_ms = 5000},
+       .expected_message = "min_wait_ms must be positive"},
+      {.config = RetryConfig{.num_retries = 1,
+                             .min_wait_ms = 1,
+                             .max_wait_ms = 0,
+                             .total_timeout_ms = 5000},
+       .expected_message = "max_wait_ms must be positive"},
+      {.config = RetryConfig{.num_retries = 1,
+                             .min_wait_ms = 20,
+                             .max_wait_ms = 10,
+                             .total_timeout_ms = 5000},
+       .expected_message = "max_wait_ms must be greater than or equal to 
min_wait_ms"},
+      {.config = RetryConfig{.num_retries = 1,
+                             .min_wait_ms = 1,
+                             .max_wait_ms = 10,
+                             .total_timeout_ms = 5000,
+                             .scale_factor = 0.5},
+       .expected_message = "scale_factor must be finite and at least 1.0"},
+      {.config = RetryConfig{.num_retries = 1,
+                             .min_wait_ms = 1,
+                             .max_wait_ms = 10,
+                             .total_timeout_ms = 5000,
+                             .scale_factor = 
std::numeric_limits<double>::infinity()},
+       .expected_message = "scale_factor must be finite and at least 1.0"},
+  };
+
+  for (const auto& test_case : test_cases) {
+    int call_count = 0;
+    int32_t attempts = 0;
+
+    auto result = RetryRunner(test_case.config)
+                      .OnlyRetryOn(ErrorKind::kCommitFailed)
+                      .Run(
+                          [&]() -> Result<int> {
+                            ++call_count;
+                            return 1;
+                          },
+                          &attempts);
+
+    EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument))
+        << test_case.expected_message;
+    EXPECT_THAT(result, HasErrorMessage(test_case.expected_message));
+    EXPECT_EQ(call_count, 0);
+    EXPECT_EQ(attempts, 0);
+  }
+}
+
+TEST(RetryRunnerTest, UnsetRetryPolicyFailsBeforeTaskRuns) {
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = 1,
+                                        .min_wait_ms = 1,
+                                        .max_wait_ms = 10,
+                                        .total_timeout_ms = 5000})
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          return CommitFailed("fail");
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(result, HasErrorMessage("Retry policy must be explicitly 
configured"));
+  EXPECT_EQ(call_count, 0);
+  EXPECT_EQ(attempts, 0);
+}
+
+TEST(RetryRunnerTest, EmptyOnlyRetryOnPolicyFailsBeforeTaskRuns) {
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = 1,
+                                        .min_wait_ms = 1,
+                                        .max_wait_ms = 10,
+                                        .total_timeout_ms = 5000})
+                    .OnlyRetryOn(std::initializer_list<ErrorKind>{})
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          return CommitFailed("fail");
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(result,
+              HasErrorMessage("Retry policy must include at least one error 
kind"));
+  EXPECT_EQ(call_count, 0);
+  EXPECT_EQ(attempts, 0);
+}
+
+TEST(RetryRunnerTest, EmptyStopRetryOnPolicyFailsBeforeTaskRuns) {
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = 1,
+                                        .min_wait_ms = 1,
+                                        .max_wait_ms = 10,
+                                        .total_timeout_ms = 5000})
+                    .StopRetryOn({})
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          return CommitFailed("fail");
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(result,
+              HasErrorMessage("Retry policy must include at least one error 
kind"));
+  EXPECT_EQ(call_count, 0);
+  EXPECT_EQ(attempts, 0);
+}
+
 TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) {
+  FakeRetryEnvironment fake_retry;
+  ScopedRetryTestHooks scoped_hooks(fake_retry.hooks());
   int call_count = 0;
   int32_t attempts = 0;
 
@@ -189,14 +454,12 @@ TEST(RetryRunnerTest, 
TotalTimeoutStopsBeforeStartingAnotherAttempt) {
                                         .min_wait_ms = 20,
                                         .max_wait_ms = 20,
                                         .total_timeout_ms = 15})
+                    .OnlyRetryOn(ErrorKind::kCommitFailed)
                     .Run(
                         [&]() -> Result<int> {
                           ++call_count;
-                          // The first failure consumes most of the 15 ms 
budget, so the
-                          // next 20 ms backoff should prevent another attempt 
from
-                          // starting.
                           if (call_count == 1) {
-                            
std::this_thread::sleep_for(std::chrono::milliseconds(10));
+                            
fake_retry.Advance(FakeRetryEnvironment::Duration(10));
                           }
                           return CommitFailed("retry budget exhausted");
                         },
@@ -205,6 +468,97 @@ TEST(RetryRunnerTest, 
TotalTimeoutStopsBeforeStartingAnotherAttempt) {
   EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
   EXPECT_EQ(call_count, 1);
   EXPECT_EQ(attempts, 1);
+  EXPECT_TRUE(fake_retry.sleep_durations().empty());
+  EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector<int32_t>({20}));
+}
+
+TEST(RetryRunnerTest, TotalTimeoutStopsWhenDelayEqualsRemainingBudget) {
+  FakeRetryEnvironment fake_retry;
+  ScopedRetryTestHooks scoped_hooks(fake_retry.hooks());
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = 3,
+                                        .min_wait_ms = 10,
+                                        .max_wait_ms = 10,
+                                        .total_timeout_ms = 20})
+                    .OnlyRetryOn(ErrorKind::kCommitFailed)
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          
fake_retry.Advance(FakeRetryEnvironment::Duration(10));
+                          return CommitFailed("retry budget exhausted");
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
+  EXPECT_EQ(call_count, 1);
+  EXPECT_EQ(attempts, 1);
+  EXPECT_TRUE(fake_retry.sleep_durations().empty());
+  EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector<int32_t>({10}));
+}
+
+TEST(RetryRunnerTest, NonPositiveTotalTimeoutDisablesDeadline) {
+  FakeRetryEnvironment fake_retry;
+  ScopedRetryTestHooks scoped_hooks(fake_retry.hooks());
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = 2,
+                                        .min_wait_ms = 10,
+                                        .max_wait_ms = 10,
+                                        .total_timeout_ms = 0})
+                    .OnlyRetryOn(ErrorKind::kCommitFailed)
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          
fake_retry.Advance(FakeRetryEnvironment::Duration(100));
+                          if (call_count <= 2) {
+                            return CommitFailed("transient");
+                          }
+                          return 123;
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsOk());
+  EXPECT_EQ(*result, 123);
+  EXPECT_EQ(call_count, 3);
+  EXPECT_EQ(attempts, 3);
+  EXPECT_EQ(fake_retry.sleep_durations(), 
std::vector<FakeRetryEnvironment::Duration>(
+                                              
{FakeRetryEnvironment::Duration(10),
+                                               
FakeRetryEnvironment::Duration(10)}));
+  EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector<int32_t>({10, 
10}));
+}
+
+TEST(RetryRunnerTest, RetryDelayDoesNotExceedMaxWaitAfterJitter) {
+  FakeRetryEnvironment fake_retry;
+  fake_retry.SetJitterOffsetMs(100);
+  ScopedRetryTestHooks scoped_hooks(fake_retry.hooks());
+  int call_count = 0;
+  int32_t attempts = 0;
+
+  auto result = RetryRunner(RetryConfig{.num_retries = 1,
+                                        .min_wait_ms = 10,
+                                        .max_wait_ms = 10,
+                                        .total_timeout_ms = 0})
+                    .OnlyRetryOn(ErrorKind::kCommitFailed)
+                    .Run(
+                        [&]() -> Result<int> {
+                          ++call_count;
+                          if (call_count == 1) {
+                            return CommitFailed("transient");
+                          }
+                          return 321;
+                        },
+                        &attempts);
+
+  EXPECT_THAT(result, IsOk());
+  EXPECT_EQ(*result, 321);
+  EXPECT_EQ(call_count, 2);
+  EXPECT_EQ(attempts, 2);
+  EXPECT_EQ(fake_retry.sleep_durations(), 
std::vector<FakeRetryEnvironment::Duration>(
+                                              
{FakeRetryEnvironment::Duration(10)}));
+  EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector<int32_t>({10}));
 }
 
 TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) {
@@ -274,31 +628,4 @@ TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
   EXPECT_EQ(attempts, 3);
 }
 
-TEST(RetryRunnerTest, DefaultRetryAllErrors) {
-  int call_count = 0;
-  int32_t attempts = 0;
-
-  auto result = RetryRunner(RetryConfig{.num_retries = 3,
-                                        .min_wait_ms = 1,
-                                        .max_wait_ms = 10,
-                                        .total_timeout_ms = 5000})
-                    .Run(
-                        [&]() -> Result<int> {
-                          ++call_count;
-                          if (call_count == 1) {
-                            return IOError("disk full");
-                          }
-                          if (call_count == 2) {
-                            return ValidationFailed("bad schema");
-                          }
-                          return 55;
-                        },
-                        &attempts);
-
-  EXPECT_THAT(result, IsOk());
-  EXPECT_EQ(*result, 55);
-  EXPECT_EQ(call_count, 3);
-  EXPECT_EQ(attempts, 3);
-}
-
 }  // namespace iceberg
diff --git a/src/iceberg/util/retry_util.cc b/src/iceberg/util/retry_util.cc
new file mode 100644
index 00000000..d6e0d509
--- /dev/null
+++ b/src/iceberg/util/retry_util.cc
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/util/retry_util.h"
+
+#include <algorithm>
+#include <chrono>
+#include <cmath>
+#include <cstdint>
+#include <limits>
+#include <optional>
+#include <random>
+#include <thread>
+
+#include "iceberg/util/retry_util_internal.h"
+
+namespace iceberg {
+namespace {
+
+const RetryTestHooks*& ActiveRetryTestHooks() {
+  // Keep test hooks thread-local so fake retry timing in one test thread does 
not
+  // leak into unrelated retry work or require synchronization around a global 
pointer.
+  static thread_local const RetryTestHooks* active_retry_test_hooks = nullptr;
+  return active_retry_test_hooks;
+}
+
+RetryTestHooks::TimePoint RetryNow() {
+  const auto* hooks = GetActiveRetryTestHooks();
+  if (hooks != nullptr && hooks->now) {
+    return hooks->now();
+  }
+  return RetryTestHooks::Clock::now();
+}
+
+void RetrySleepFor(RetryTestHooks::Duration duration) {
+  const auto* hooks = GetActiveRetryTestHooks();
+  if (hooks != nullptr && hooks->sleep_for) {
+    hooks->sleep_for(duration);
+    return;
+  }
+  std::this_thread::sleep_for(duration);
+}
+
+int32_t ApplyRetryJitter(int32_t base_delay_ms) {
+  const auto* hooks = GetActiveRetryTestHooks();
+  if (hooks != nullptr && hooks->jitter) {
+    return hooks->jitter(base_delay_ms);
+  }
+
+  static thread_local std::mt19937 gen(std::random_device{}());
+  const int32_t jitter_range = std::max(1, base_delay_ms / 10);
+  std::uniform_int_distribution<> dis(0, jitter_range - 1);
+  const int64_t jittered_delay_ms = static_cast<int64_t>(base_delay_ms) + 
dis(gen);
+  return static_cast<int32_t>(
+      std::min<int64_t>(jittered_delay_ms, 
std::numeric_limits<int32_t>::max()));
+}
+
+}  // namespace
+
+const RetryTestHooks* GetActiveRetryTestHooks() { return 
ActiveRetryTestHooks(); }
+
+void SetActiveRetryTestHooks(const RetryTestHooks* hooks) {
+  ActiveRetryTestHooks() = hooks;
+}
+
+Status RetryRunner::ValidateConfig() const {
+  if (config_.num_retries < 0) {
+    return InvalidArgument("num_retries must be non-negative, got {}",
+                           config_.num_retries);
+  }
+  if (config_.num_retries == 0) {
+    return {};
+  }
+  if (config_.num_retries == std::numeric_limits<int32_t>::max()) {
+    return InvalidArgument("num_retries is too large, got {}", 
config_.num_retries);
+  }
+  if (config_.min_wait_ms <= 0) {
+    return InvalidArgument("min_wait_ms must be positive, got {}", 
config_.min_wait_ms);
+  }
+  if (config_.max_wait_ms <= 0) {
+    return InvalidArgument("max_wait_ms must be positive, got {}", 
config_.max_wait_ms);
+  }
+  if (config_.max_wait_ms < config_.min_wait_ms) {
+    return InvalidArgument("max_wait_ms must be greater than or equal to 
min_wait_ms");
+  }
+  if (!std::isfinite(config_.scale_factor) || config_.scale_factor < 1.0) {
+    return InvalidArgument("scale_factor must be finite and at least 1.0, got 
{}",
+                           config_.scale_factor);
+  }
+  if (retry_policy_mode_ == RetryPolicyMode::kUnset) {
+    return InvalidArgument(
+        "Retry policy must be explicitly configured with OnlyRetryOn(...) or "
+        "StopRetryOn(...) when num_retries > 0");
+  }
+  if (retry_error_kinds_.empty()) {
+    return InvalidArgument("Retry policy must include at least one error 
kind");
+  }
+
+  return {};
+}
+
+std::optional<RetryRunner::TimePoint> RetryRunner::ComputeDeadline() const {
+  if (config_.total_timeout_ms <= 0) {
+    return std::nullopt;
+  }
+  return RetryNow() + Duration(config_.total_timeout_ms);
+}
+
+bool RetryRunner::HasTimedOut(const std::optional<TimePoint>& deadline) const {
+  return deadline.has_value() && RetryNow() >= *deadline;
+}
+
+bool RetryRunner::ShouldRetry(ErrorKind kind) const {
+  const bool policy_contains_kind = std::ranges::contains(retry_error_kinds_, 
kind);
+  switch (retry_policy_mode_) {
+    case RetryPolicyMode::kOnlyRetryOn:
+      return policy_contains_kind;
+    case RetryPolicyMode::kStopRetryOn:
+      return !policy_contains_kind;
+    case RetryPolicyMode::kUnset:
+      return false;
+  }
+  return false;
+}
+
+bool RetryRunner::CanRetry(ErrorKind kind, int32_t attempt, int32_t 
max_attempts,
+                           const std::optional<TimePoint>& deadline) const {
+  return attempt < max_attempts && !HasTimedOut(deadline) && ShouldRetry(kind);
+}
+
+std::optional<RetryRunner::Duration> RetryRunner::RetryDelayWithinBudget(
+    int32_t attempt, const std::optional<TimePoint>& deadline) const {
+  const auto delay = Duration(CalculateDelay(attempt));
+  if (!deadline.has_value()) {
+    return delay;
+  }
+
+  const auto now = RetryNow();
+  if (now >= *deadline) {
+    return std::nullopt;
+  }
+
+  const auto remaining = std::chrono::duration_cast<Duration>(*deadline - now);
+  if (remaining <= Duration::zero() || delay >= remaining) {
+    return std::nullopt;
+  }
+
+  return delay;
+}
+
+bool RetryRunner::WaitForNextAttempt(int32_t attempt,
+                                     const std::optional<TimePoint>& deadline) 
const {
+  const auto delay = RetryDelayWithinBudget(attempt, deadline);
+  if (!delay.has_value()) {
+    return false;
+  }
+
+  RetrySleepFor(*delay);
+  return !HasTimedOut(deadline);
+}
+
+int32_t RetryRunner::CalculateDelay(int32_t attempt) const {
+  const double base_delay =
+      config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1);
+  const int32_t delay_ms = static_cast<int32_t>(
+      std::min(base_delay, static_cast<double>(config_.max_wait_ms)));
+  return std::clamp(ApplyRetryJitter(delay_ms), 1, config_.max_wait_ms);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h
index 7041a40e..83e2cd5c 100644
--- a/src/iceberg/util/retry_util.h
+++ b/src/iceberg/util/retry_util.h
@@ -19,12 +19,14 @@
 
 #pragma once
 
-#include <algorithm>
 #include <chrono>
-#include <cmath>
+#include <concepts>
+#include <cstdint>
+#include <functional>
+#include <initializer_list>
 #include <optional>
-#include <random>
-#include <thread>
+#include <type_traits>
+#include <utility>
 #include <vector>
 
 #include "iceberg/iceberg_export.h"
@@ -32,6 +34,27 @@
 
 namespace iceberg {
 
+namespace detail {
+
+template <typename T>
+struct IsResult : std::false_type {};
+
+template <typename T>
+struct IsResult<Result<T>> : std::true_type {};
+
+template <typename T>
+concept ResultType = IsResult<std::remove_cvref_t<T>>::value;
+
+template <typename F>
+concept RetryTask = requires(F& f) {
+  { std::invoke(f) } -> ResultType;
+};
+
+template <typename F>
+using RetryTaskResult = std::remove_cvref_t<std::invoke_result_t<F&>>;
+
+}  // namespace detail
+
 /// \brief Configuration for retry behavior
 struct ICEBERG_EXPORT RetryConfig {
   /// Maximum number of retry attempts (not including the first attempt)
@@ -47,6 +70,9 @@ struct ICEBERG_EXPORT RetryConfig {
 };
 
 /// \brief Utility class for running tasks with retry logic
+///
+/// When retries are enabled (`num_retries > 0`), callers must explicitly 
configure
+/// retry policy with `OnlyRetryOn(...)` or `StopRetryOn(...)`.
 class ICEBERG_EXPORT RetryRunner {
  public:
   /// \brief Construct a RetryRunner with the given configuration
@@ -60,7 +86,8 @@ class ICEBERG_EXPORT RetryRunner {
   /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
   /// StopRetryOn is ignored.
   RetryRunner& OnlyRetryOn(std::initializer_list<ErrorKind> error_kinds) {
-    only_retry_on_ = std::vector<ErrorKind>(error_kinds);
+    retry_policy_mode_ = RetryPolicyMode::kOnlyRetryOn;
+    retry_error_kinds_ = std::vector<ErrorKind>(error_kinds);
     return *this;
   }
 
@@ -68,10 +95,7 @@ class ICEBERG_EXPORT RetryRunner {
   ///
   /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
   /// StopRetryOn is ignored.
-  RetryRunner& OnlyRetryOn(ErrorKind error_kind) {
-    only_retry_on_ = std::vector<ErrorKind>{error_kind};
-    return *this;
-  }
+  RetryRunner& OnlyRetryOn(ErrorKind error_kind) { return 
OnlyRetryOn({error_kind}); }
 
   /// \brief Specify error types that should stop retries immediately.
   ///
@@ -81,18 +105,35 @@ class ICEBERG_EXPORT RetryRunner {
   /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
   /// StopRetryOn is ignored.
   RetryRunner& StopRetryOn(std::initializer_list<ErrorKind> error_kinds) {
-    stop_retry_on_ = std::vector<ErrorKind>(error_kinds);
+    if (retry_policy_mode_ == RetryPolicyMode::kOnlyRetryOn) {
+      return *this;
+    }
+
+    retry_policy_mode_ = RetryPolicyMode::kStopRetryOn;
+    retry_error_kinds_ = std::vector<ErrorKind>(error_kinds);
     return *this;
   }
 
+  /// \brief Specify a single error type that should stop retries immediately.
+  ///
+  /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
+  /// StopRetryOn is ignored.
+  RetryRunner& StopRetryOn(ErrorKind error_kind) { return 
StopRetryOn({error_kind}); }
+
   /// \brief Run a task that returns a Result<T>
   ///
+  /// When `num_retries > 0`, the retry policy must be configured explicitly 
via
+  /// `OnlyRetryOn(...)` or `StopRetryOn(...)`.
+  ///
   /// TODO: Replace attempt_counter with a metrics reporter once it is 
available.
-  template <typename F, typename T = typename 
std::invoke_result_t<F>::value_type>
-  Result<T> Run(F&& task, int32_t* attempt_counter = nullptr) {
-    if (config_.num_retries < 0) {
-      return InvalidArgument("num_retries must be non-negative, got {}",
-                             config_.num_retries);
+  template <typename F>
+    requires detail::RetryTask<F>
+  auto Run(F&& task, int32_t* attempt_counter = nullptr) -> 
detail::RetryTaskResult<F> {
+    using TaskResult = detail::RetryTaskResult<F>;
+
+    const auto validation = ValidateConfig();
+    if (!validation.has_value()) {
+      return TaskResult(std::unexpected(validation.error()));
     }
 
     const auto deadline = ComputeDeadline();
@@ -105,7 +146,7 @@ class ICEBERG_EXPORT RetryRunner {
         *attempt_counter = attempt;
       }
 
-      auto result = task();
+      auto result = std::invoke(task);
       if (result.has_value()) {
         return result;
       }
@@ -121,89 +162,38 @@ class ICEBERG_EXPORT RetryRunner {
   }
 
  private:
+  enum class RetryPolicyMode {
+    // No retry policy was selected; invalid when retries are enabled.
+    kUnset,
+    // Retry only errors listed in retry_error_kinds_.
+    kOnlyRetryOn,
+    // Retry all errors except those listed in retry_error_kinds_.
+    kStopRetryOn,
+  };
+
   using Clock = std::chrono::steady_clock;
   using Duration = std::chrono::milliseconds;
   using TimePoint = Clock::time_point;
 
-  std::optional<TimePoint> ComputeDeadline() const {
-    if (config_.total_timeout_ms <= 0) {
-      return std::nullopt;
-    }
-    return Clock::now() + Duration(config_.total_timeout_ms);
-  }
-
-  bool HasTimedOut(const std::optional<TimePoint>& deadline) const {
-    return deadline.has_value() && Clock::now() >= *deadline;
-  }
+  /// \brief Validate retry counts, timing bounds, and the selected retry 
policy.
+  Status ValidateConfig() const;
+  std::optional<TimePoint> ComputeDeadline() const;
+  bool HasTimedOut(const std::optional<TimePoint>& deadline) const;
 
   /// \brief Check if the given error kind should trigger a retry.
-  bool ShouldRetry(ErrorKind kind) const {
-    if (!only_retry_on_.empty()) {
-      return std::ranges::any_of(only_retry_on_,
-                                 [kind](ErrorKind k) { return kind == k; });
-    }
-
-    if (!stop_retry_on_.empty()) {
-      return !std::ranges::any_of(stop_retry_on_,
-                                  [kind](ErrorKind k) { return kind == k; });
-    }
-
-    return true;
-  }
-
+  bool ShouldRetry(ErrorKind kind) const;
   bool CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts,
-                const std::optional<TimePoint>& deadline) const {
-    return attempt < max_attempts && !HasTimedOut(deadline) && 
ShouldRetry(kind);
-  }
-
+                const std::optional<TimePoint>& deadline) const;
   std::optional<Duration> RetryDelayWithinBudget(
-      int32_t attempt, const std::optional<TimePoint>& deadline) const {
-    const auto delay = Duration(CalculateDelay(attempt));
-    if (!deadline.has_value()) {
-      return delay;
-    }
-
-    const auto now = Clock::now();
-    if (now >= *deadline) {
-      return std::nullopt;
-    }
-
-    const auto remaining = std::chrono::duration_cast<Duration>(*deadline - 
now);
-    if (remaining <= Duration::zero() || delay >= remaining) {
-      return std::nullopt;
-    }
-
-    return delay;
-  }
-
+      int32_t attempt, const std::optional<TimePoint>& deadline) const;
   bool WaitForNextAttempt(int32_t attempt,
-                          const std::optional<TimePoint>& deadline) const {
-    const auto delay = RetryDelayWithinBudget(attempt, deadline);
-    if (!delay.has_value()) {
-      return false;
-    }
-
-    std::this_thread::sleep_for(*delay);
-    return !HasTimedOut(deadline);
-  }
-
+                          const std::optional<TimePoint>& deadline) const;
   /// \brief Calculate delay with exponential backoff and jitter
-  int32_t CalculateDelay(int32_t attempt) const {
-    // Calculate base delay with exponential backoff
-    double base_delay = config_.min_wait_ms * std::pow(config_.scale_factor, 
attempt - 1);
-    int32_t delay_ms = static_cast<int32_t>(
-        std::min(base_delay, static_cast<double>(config_.max_wait_ms)));
-
-    static thread_local std::mt19937 gen(std::random_device{}());
-    int32_t jitter_range = std::max(1, delay_ms / 10);
-    std::uniform_int_distribution<> dis(0, jitter_range - 1);
-    delay_ms += dis(gen);
-    return std::max(1, delay_ms);
-  }
+  int32_t CalculateDelay(int32_t attempt) const;
 
   RetryConfig config_;
-  std::vector<ErrorKind> only_retry_on_;
-  std::vector<ErrorKind> stop_retry_on_;
+  RetryPolicyMode retry_policy_mode_ = RetryPolicyMode::kUnset;
+  std::vector<ErrorKind> retry_error_kinds_;
 };
 
 /// \brief Helper function to create a RetryRunner with table commit 
configuration
diff --git a/src/iceberg/util/retry_util_internal.h 
b/src/iceberg/util/retry_util_internal.h
new file mode 100644
index 00000000..f52dc873
--- /dev/null
+++ b/src/iceberg/util/retry_util_internal.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <cstdint>
+#include <functional>
+
+#include "iceberg/iceberg_export.h"
+
+namespace iceberg {
+
+struct RetryTestHooks {
+  using Clock = std::chrono::steady_clock;
+  using Duration = std::chrono::milliseconds;
+  using TimePoint = Clock::time_point;
+
+  std::function<TimePoint()> now;
+  std::function<void(Duration)> sleep_for;
+  std::function<int32_t(int32_t)> jitter;
+};
+
+ICEBERG_EXPORT const RetryTestHooks* GetActiveRetryTestHooks();
+ICEBERG_EXPORT void SetActiveRetryTestHooks(const RetryTestHooks* hooks);
+
+class ScopedRetryTestHooks {
+ public:
+  explicit ScopedRetryTestHooks(const RetryTestHooks& hooks)
+      : previous_hooks_(GetActiveRetryTestHooks()) {
+    SetActiveRetryTestHooks(&hooks);
+  }
+
+  ScopedRetryTestHooks(const ScopedRetryTestHooks&) = delete;
+  ScopedRetryTestHooks& operator=(const ScopedRetryTestHooks&) = delete;
+  ScopedRetryTestHooks(ScopedRetryTestHooks&&) = delete;
+  ScopedRetryTestHooks& operator=(ScopedRetryTestHooks&&) = delete;
+
+  ~ScopedRetryTestHooks() { SetActiveRetryTestHooks(previous_hooks_); }
+
+ private:
+  const RetryTestHooks* previous_hooks_;
+};
+
+}  // namespace iceberg


Reply via email to