This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ff422295 refactor(util): make retry policy explicit and deterministic
(#633)
ff422295 is described below
commit ff4222958eb47ab41dc6603e945318c8f3857316
Author: Gang Wu <[email protected]>
AuthorDate: Wed May 6 16:04:16 2026 +0800
refactor(util): make retry policy explicit and deterministic (#633)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/meson.build | 1 +
src/iceberg/test/retry_util_test.cc | 399 ++++++++++++++++++++++++++++++---
src/iceberg/util/retry_util.cc | 186 +++++++++++++++
src/iceberg/util/retry_util.h | 164 +++++++-------
src/iceberg/util/retry_util_internal.h | 61 +++++
6 files changed, 689 insertions(+), 123 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 6d190cb6..02099f6b 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -104,6 +104,7 @@ set(ICEBERG_SOURCES
util/gzip_internal.cc
util/murmurhash3_internal.cc
util/property_util.cc
+ util/retry_util.cc
util/snapshot_util.cc
util/string_util.cc
util/struct_like_set.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 17ed723f..41a5c2dd 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -126,6 +126,7 @@ iceberg_sources = files(
'util/gzip_internal.cc',
'util/murmurhash3_internal.cc',
'util/property_util.cc',
+ 'util/retry_util.cc',
'util/snapshot_util.cc',
'util/string_util.cc',
'util/struct_like_set.cc',
diff --git a/src/iceberg/test/retry_util_test.cc
b/src/iceberg/test/retry_util_test.cc
index 05c7cb0e..ead22191 100644
--- a/src/iceberg/test/retry_util_test.cc
+++ b/src/iceberg/test/retry_util_test.cc
@@ -19,15 +19,73 @@
#include "iceberg/util/retry_util.h"
-#include <chrono>
-#include <thread>
+#include <concepts>
+#include <limits>
+#include <vector>
#include <gtest/gtest.h>
#include "iceberg/result.h"
#include "iceberg/test/matchers.h"
+#include "iceberg/util/retry_util_internal.h"
namespace iceberg {
+namespace {
+
+struct ResultReturningTask {
+ Result<int> operator()() const { return 1; }
+};
+
+struct NonResultReturningTask {
+ int operator()() const { return 1; }
+};
+
+static_assert(detail::RetryTask<ResultReturningTask>);
+static_assert(!detail::RetryTask<NonResultReturningTask>);
+static_assert(requires(RetryRunner runner, ResultReturningTask task) {
+ { runner.Run(task) } -> std::same_as<Result<int>>;
+});
+
+class FakeRetryEnvironment {
+ public:
+ using Duration = RetryTestHooks::Duration;
+ using TimePoint = RetryTestHooks::TimePoint;
+
+ FakeRetryEnvironment() {
+ hooks_.now = [this]() { return now_; };
+ hooks_.sleep_for = [this](Duration duration) {
+ sleep_durations_.push_back(duration);
+ now_ += duration;
+ };
+ hooks_.jitter = [this](int32_t base_delay_ms) {
+ observed_base_delays_ms_.push_back(base_delay_ms);
+ return base_delay_ms + jitter_offset_ms_;
+ };
+ }
+
+ void Advance(Duration duration) { now_ += duration; }
+
+ void SetJitterOffsetMs(int32_t jitter_offset_ms) {
+ jitter_offset_ms_ = jitter_offset_ms;
+ }
+
+ const RetryTestHooks& hooks() const { return hooks_; }
+
+ const std::vector<Duration>& sleep_durations() const { return
sleep_durations_; }
+
+ const std::vector<int32_t>& observed_base_delays_ms() const {
+ return observed_base_delays_ms_;
+ }
+
+ private:
+ RetryTestHooks hooks_;
+ TimePoint now_{};
+ int32_t jitter_offset_ms_ = 0;
+ std::vector<Duration> sleep_durations_;
+ std::vector<int32_t> observed_base_delays_ms_;
+};
+
+} // namespace
TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
int call_count = 0;
@@ -37,6 +95,7 @@ TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
.min_wait_ms = 1,
.max_wait_ms = 10,
.total_timeout_ms = 5000})
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
.Run(
[&]() -> Result<int> {
++call_count;
@@ -58,6 +117,7 @@ TEST(RetryRunnerTest, RetryOnceThenSucceed) {
.min_wait_ms = 1,
.max_wait_ms = 10,
.total_timeout_ms = 5000})
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
.Run(
[&]() -> Result<int> {
++call_count;
@@ -82,6 +142,7 @@ TEST(RetryRunnerTest, MaxAttemptsExhausted) {
.min_wait_ms = 1,
.max_wait_ms = 10,
.total_timeout_ms = 5000})
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
.Run(
[&]() -> Result<int> {
++call_count;
@@ -140,6 +201,32 @@ TEST(RetryRunnerTest, OnlyRetryOnMatchingError) {
EXPECT_EQ(attempts, 3);
}
+TEST(RetryRunnerTest, OnlyRetryOnTakesPrecedenceOverStopRetryOn) {
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = 2,
+ .min_wait_ms = 1,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000})
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
+ .StopRetryOn(ErrorKind::kCommitFailed)
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+ if (call_count == 1) {
+ return CommitFailed("transient");
+ }
+ return 100;
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsOk());
+ EXPECT_EQ(*result, 100);
+ EXPECT_EQ(call_count, 2);
+ EXPECT_EQ(attempts, 2);
+}
+
TEST(RetryRunnerTest, StopRetryOnMatchingError) {
int call_count = 0;
int32_t attempts = 0;
@@ -148,7 +235,7 @@ TEST(RetryRunnerTest, StopRetryOnMatchingError) {
.min_wait_ms = 1,
.max_wait_ms = 10,
.total_timeout_ms = 5000})
- .StopRetryOn({ErrorKind::kCommitStateUnknown})
+ .StopRetryOn(ErrorKind::kCommitStateUnknown)
.Run(
[&]() -> Result<int> {
++call_count;
@@ -161,14 +248,40 @@ TEST(RetryRunnerTest, StopRetryOnMatchingError) {
EXPECT_EQ(attempts, 1);
}
-TEST(RetryRunnerTest, ZeroRetries) {
+TEST(RetryRunnerTest, StopRetryOnNonMatchingErrorAllowsRetry) {
int call_count = 0;
int32_t attempts = 0;
- auto result = RetryRunner(RetryConfig{.num_retries = 0,
+ auto result = RetryRunner(RetryConfig{.num_retries = 2,
.min_wait_ms = 1,
.max_wait_ms = 10,
.total_timeout_ms = 5000})
+ .StopRetryOn({ErrorKind::kCommitStateUnknown})
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+ if (call_count == 1) {
+ return CommitFailed("retryable");
+ }
+ return 88;
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsOk());
+ EXPECT_EQ(*result, 88);
+ EXPECT_EQ(call_count, 2);
+ EXPECT_EQ(attempts, 2);
+}
+
+TEST(RetryRunnerTest, ZeroRetriesAllowsUnsetPolicyAndSkipsBackoffValidation) {
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = 0,
+ .min_wait_ms = 0,
+ .max_wait_ms = 0,
+ .total_timeout_ms = 5000,
+ .scale_factor = 0.5})
.Run(
[&]() -> Result<int> {
++call_count;
@@ -181,7 +294,159 @@ TEST(RetryRunnerTest, ZeroRetries) {
EXPECT_EQ(attempts, 1);
}
+TEST(RetryRunnerTest, NegativeRetriesFailsBeforeTaskRuns) {
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = -1,
+ .min_wait_ms = 1,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000})
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+ return 1;
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(result, HasErrorMessage("num_retries must be non-negative"));
+ EXPECT_EQ(call_count, 0);
+ EXPECT_EQ(attempts, 0);
+}
+
+TEST(RetryRunnerTest, InvalidBackoffConfigFailsBeforeTaskRuns) {
+ struct InvalidConfigCase {
+ RetryConfig config;
+ const char* expected_message;
+ };
+
+ const std::vector<InvalidConfigCase> test_cases = {
+ {.config = RetryConfig{.num_retries =
std::numeric_limits<int32_t>::max(),
+ .min_wait_ms = 1,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000},
+ .expected_message = "num_retries is too large"},
+ {.config = RetryConfig{.num_retries = 1,
+ .min_wait_ms = 0,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000},
+ .expected_message = "min_wait_ms must be positive"},
+ {.config = RetryConfig{.num_retries = 1,
+ .min_wait_ms = 1,
+ .max_wait_ms = 0,
+ .total_timeout_ms = 5000},
+ .expected_message = "max_wait_ms must be positive"},
+ {.config = RetryConfig{.num_retries = 1,
+ .min_wait_ms = 20,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000},
+ .expected_message = "max_wait_ms must be greater than or equal to
min_wait_ms"},
+ {.config = RetryConfig{.num_retries = 1,
+ .min_wait_ms = 1,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000,
+ .scale_factor = 0.5},
+ .expected_message = "scale_factor must be finite and at least 1.0"},
+ {.config = RetryConfig{.num_retries = 1,
+ .min_wait_ms = 1,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000,
+ .scale_factor =
std::numeric_limits<double>::infinity()},
+ .expected_message = "scale_factor must be finite and at least 1.0"},
+ };
+
+ for (const auto& test_case : test_cases) {
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(test_case.config)
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+ return 1;
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument))
+ << test_case.expected_message;
+ EXPECT_THAT(result, HasErrorMessage(test_case.expected_message));
+ EXPECT_EQ(call_count, 0);
+ EXPECT_EQ(attempts, 0);
+ }
+}
+
+TEST(RetryRunnerTest, UnsetRetryPolicyFailsBeforeTaskRuns) {
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = 1,
+ .min_wait_ms = 1,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000})
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+ return CommitFailed("fail");
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(result, HasErrorMessage("Retry policy must be explicitly
configured"));
+ EXPECT_EQ(call_count, 0);
+ EXPECT_EQ(attempts, 0);
+}
+
+TEST(RetryRunnerTest, EmptyOnlyRetryOnPolicyFailsBeforeTaskRuns) {
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = 1,
+ .min_wait_ms = 1,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000})
+ .OnlyRetryOn(std::initializer_list<ErrorKind>{})
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+ return CommitFailed("fail");
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(result,
+ HasErrorMessage("Retry policy must include at least one error
kind"));
+ EXPECT_EQ(call_count, 0);
+ EXPECT_EQ(attempts, 0);
+}
+
+TEST(RetryRunnerTest, EmptyStopRetryOnPolicyFailsBeforeTaskRuns) {
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = 1,
+ .min_wait_ms = 1,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 5000})
+ .StopRetryOn({})
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+ return CommitFailed("fail");
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(result,
+ HasErrorMessage("Retry policy must include at least one error
kind"));
+ EXPECT_EQ(call_count, 0);
+ EXPECT_EQ(attempts, 0);
+}
+
TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) {
+ FakeRetryEnvironment fake_retry;
+ ScopedRetryTestHooks scoped_hooks(fake_retry.hooks());
int call_count = 0;
int32_t attempts = 0;
@@ -189,14 +454,12 @@ TEST(RetryRunnerTest,
TotalTimeoutStopsBeforeStartingAnotherAttempt) {
.min_wait_ms = 20,
.max_wait_ms = 20,
.total_timeout_ms = 15})
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
.Run(
[&]() -> Result<int> {
++call_count;
- // The first failure consumes most of the 15 ms
budget, so the
- // next 20 ms backoff should prevent another attempt
from
- // starting.
if (call_count == 1) {
-
std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
fake_retry.Advance(FakeRetryEnvironment::Duration(10));
}
return CommitFailed("retry budget exhausted");
},
@@ -205,6 +468,97 @@ TEST(RetryRunnerTest,
TotalTimeoutStopsBeforeStartingAnotherAttempt) {
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
EXPECT_EQ(call_count, 1);
EXPECT_EQ(attempts, 1);
+ EXPECT_TRUE(fake_retry.sleep_durations().empty());
+ EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector<int32_t>({20}));
+}
+
+TEST(RetryRunnerTest, TotalTimeoutStopsWhenDelayEqualsRemainingBudget) {
+ FakeRetryEnvironment fake_retry;
+ ScopedRetryTestHooks scoped_hooks(fake_retry.hooks());
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = 3,
+ .min_wait_ms = 10,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 20})
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+
fake_retry.Advance(FakeRetryEnvironment::Duration(10));
+ return CommitFailed("retry budget exhausted");
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
+ EXPECT_EQ(call_count, 1);
+ EXPECT_EQ(attempts, 1);
+ EXPECT_TRUE(fake_retry.sleep_durations().empty());
+ EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector<int32_t>({10}));
+}
+
+TEST(RetryRunnerTest, NonPositiveTotalTimeoutDisablesDeadline) {
+ FakeRetryEnvironment fake_retry;
+ ScopedRetryTestHooks scoped_hooks(fake_retry.hooks());
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = 2,
+ .min_wait_ms = 10,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 0})
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+
fake_retry.Advance(FakeRetryEnvironment::Duration(100));
+ if (call_count <= 2) {
+ return CommitFailed("transient");
+ }
+ return 123;
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsOk());
+ EXPECT_EQ(*result, 123);
+ EXPECT_EQ(call_count, 3);
+ EXPECT_EQ(attempts, 3);
+ EXPECT_EQ(fake_retry.sleep_durations(),
std::vector<FakeRetryEnvironment::Duration>(
+
{FakeRetryEnvironment::Duration(10),
+
FakeRetryEnvironment::Duration(10)}));
+ EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector<int32_t>({10,
10}));
+}
+
+TEST(RetryRunnerTest, RetryDelayDoesNotExceedMaxWaitAfterJitter) {
+ FakeRetryEnvironment fake_retry;
+ fake_retry.SetJitterOffsetMs(100);
+ ScopedRetryTestHooks scoped_hooks(fake_retry.hooks());
+ int call_count = 0;
+ int32_t attempts = 0;
+
+ auto result = RetryRunner(RetryConfig{.num_retries = 1,
+ .min_wait_ms = 10,
+ .max_wait_ms = 10,
+ .total_timeout_ms = 0})
+ .OnlyRetryOn(ErrorKind::kCommitFailed)
+ .Run(
+ [&]() -> Result<int> {
+ ++call_count;
+ if (call_count == 1) {
+ return CommitFailed("transient");
+ }
+ return 321;
+ },
+ &attempts);
+
+ EXPECT_THAT(result, IsOk());
+ EXPECT_EQ(*result, 321);
+ EXPECT_EQ(call_count, 2);
+ EXPECT_EQ(attempts, 2);
+ EXPECT_EQ(fake_retry.sleep_durations(),
std::vector<FakeRetryEnvironment::Duration>(
+
{FakeRetryEnvironment::Duration(10)}));
+ EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector<int32_t>({10}));
}
TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) {
@@ -274,31 +628,4 @@ TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
EXPECT_EQ(attempts, 3);
}
-TEST(RetryRunnerTest, DefaultRetryAllErrors) {
- int call_count = 0;
- int32_t attempts = 0;
-
- auto result = RetryRunner(RetryConfig{.num_retries = 3,
- .min_wait_ms = 1,
- .max_wait_ms = 10,
- .total_timeout_ms = 5000})
- .Run(
- [&]() -> Result<int> {
- ++call_count;
- if (call_count == 1) {
- return IOError("disk full");
- }
- if (call_count == 2) {
- return ValidationFailed("bad schema");
- }
- return 55;
- },
- &attempts);
-
- EXPECT_THAT(result, IsOk());
- EXPECT_EQ(*result, 55);
- EXPECT_EQ(call_count, 3);
- EXPECT_EQ(attempts, 3);
-}
-
} // namespace iceberg
diff --git a/src/iceberg/util/retry_util.cc b/src/iceberg/util/retry_util.cc
new file mode 100644
index 00000000..d6e0d509
--- /dev/null
+++ b/src/iceberg/util/retry_util.cc
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/util/retry_util.h"
+
+#include <algorithm>
+#include <chrono>
+#include <cmath>
+#include <cstdint>
+#include <limits>
+#include <optional>
+#include <random>
+#include <thread>
+
+#include "iceberg/util/retry_util_internal.h"
+
+namespace iceberg {
+namespace {
+
+const RetryTestHooks*& ActiveRetryTestHooks() {
+ // Keep test hooks thread-local so fake retry timing in one test thread does
not
+ // leak into unrelated retry work or require synchronization around a global
pointer.
+ static thread_local const RetryTestHooks* active_retry_test_hooks = nullptr;
+ return active_retry_test_hooks;
+}
+
+RetryTestHooks::TimePoint RetryNow() {
+ const auto* hooks = GetActiveRetryTestHooks();
+ if (hooks != nullptr && hooks->now) {
+ return hooks->now();
+ }
+ return RetryTestHooks::Clock::now();
+}
+
+void RetrySleepFor(RetryTestHooks::Duration duration) {
+ const auto* hooks = GetActiveRetryTestHooks();
+ if (hooks != nullptr && hooks->sleep_for) {
+ hooks->sleep_for(duration);
+ return;
+ }
+ std::this_thread::sleep_for(duration);
+}
+
+int32_t ApplyRetryJitter(int32_t base_delay_ms) {
+ const auto* hooks = GetActiveRetryTestHooks();
+ if (hooks != nullptr && hooks->jitter) {
+ return hooks->jitter(base_delay_ms);
+ }
+
+ static thread_local std::mt19937 gen(std::random_device{}());
+ const int32_t jitter_range = std::max(1, base_delay_ms / 10);
+ std::uniform_int_distribution<> dis(0, jitter_range - 1);
+ const int64_t jittered_delay_ms = static_cast<int64_t>(base_delay_ms) +
dis(gen);
+ return static_cast<int32_t>(
+ std::min<int64_t>(jittered_delay_ms,
std::numeric_limits<int32_t>::max()));
+}
+
+} // namespace
+
+const RetryTestHooks* GetActiveRetryTestHooks() { return
ActiveRetryTestHooks(); }
+
+void SetActiveRetryTestHooks(const RetryTestHooks* hooks) {
+ ActiveRetryTestHooks() = hooks;
+}
+
+Status RetryRunner::ValidateConfig() const {
+ if (config_.num_retries < 0) {
+ return InvalidArgument("num_retries must be non-negative, got {}",
+ config_.num_retries);
+ }
+ if (config_.num_retries == 0) {
+ return {};
+ }
+ if (config_.num_retries == std::numeric_limits<int32_t>::max()) {
+ return InvalidArgument("num_retries is too large, got {}",
config_.num_retries);
+ }
+ if (config_.min_wait_ms <= 0) {
+ return InvalidArgument("min_wait_ms must be positive, got {}",
config_.min_wait_ms);
+ }
+ if (config_.max_wait_ms <= 0) {
+ return InvalidArgument("max_wait_ms must be positive, got {}",
config_.max_wait_ms);
+ }
+ if (config_.max_wait_ms < config_.min_wait_ms) {
+ return InvalidArgument("max_wait_ms must be greater than or equal to
min_wait_ms");
+ }
+ if (!std::isfinite(config_.scale_factor) || config_.scale_factor < 1.0) {
+ return InvalidArgument("scale_factor must be finite and at least 1.0, got
{}",
+ config_.scale_factor);
+ }
+ if (retry_policy_mode_ == RetryPolicyMode::kUnset) {
+ return InvalidArgument(
+ "Retry policy must be explicitly configured with OnlyRetryOn(...) or "
+ "StopRetryOn(...) when num_retries > 0");
+ }
+ if (retry_error_kinds_.empty()) {
+ return InvalidArgument("Retry policy must include at least one error
kind");
+ }
+
+ return {};
+}
+
+std::optional<RetryRunner::TimePoint> RetryRunner::ComputeDeadline() const {
+ if (config_.total_timeout_ms <= 0) {
+ return std::nullopt;
+ }
+ return RetryNow() + Duration(config_.total_timeout_ms);
+}
+
+bool RetryRunner::HasTimedOut(const std::optional<TimePoint>& deadline) const {
+ return deadline.has_value() && RetryNow() >= *deadline;
+}
+
+bool RetryRunner::ShouldRetry(ErrorKind kind) const {
+ const bool policy_contains_kind = std::ranges::contains(retry_error_kinds_,
kind);
+ switch (retry_policy_mode_) {
+ case RetryPolicyMode::kOnlyRetryOn:
+ return policy_contains_kind;
+ case RetryPolicyMode::kStopRetryOn:
+ return !policy_contains_kind;
+ case RetryPolicyMode::kUnset:
+ return false;
+ }
+ return false;
+}
+
+bool RetryRunner::CanRetry(ErrorKind kind, int32_t attempt, int32_t
max_attempts,
+ const std::optional<TimePoint>& deadline) const {
+ return attempt < max_attempts && !HasTimedOut(deadline) && ShouldRetry(kind);
+}
+
+std::optional<RetryRunner::Duration> RetryRunner::RetryDelayWithinBudget(
+ int32_t attempt, const std::optional<TimePoint>& deadline) const {
+ const auto delay = Duration(CalculateDelay(attempt));
+ if (!deadline.has_value()) {
+ return delay;
+ }
+
+ const auto now = RetryNow();
+ if (now >= *deadline) {
+ return std::nullopt;
+ }
+
+ const auto remaining = std::chrono::duration_cast<Duration>(*deadline - now);
+ if (remaining <= Duration::zero() || delay >= remaining) {
+ return std::nullopt;
+ }
+
+ return delay;
+}
+
+bool RetryRunner::WaitForNextAttempt(int32_t attempt,
+ const std::optional<TimePoint>& deadline)
const {
+ const auto delay = RetryDelayWithinBudget(attempt, deadline);
+ if (!delay.has_value()) {
+ return false;
+ }
+
+ RetrySleepFor(*delay);
+ return !HasTimedOut(deadline);
+}
+
+int32_t RetryRunner::CalculateDelay(int32_t attempt) const {
+ const double base_delay =
+ config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1);
+ const int32_t delay_ms = static_cast<int32_t>(
+ std::min(base_delay, static_cast<double>(config_.max_wait_ms)));
+ return std::clamp(ApplyRetryJitter(delay_ms), 1, config_.max_wait_ms);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h
index 7041a40e..83e2cd5c 100644
--- a/src/iceberg/util/retry_util.h
+++ b/src/iceberg/util/retry_util.h
@@ -19,12 +19,14 @@
#pragma once
-#include <algorithm>
#include <chrono>
-#include <cmath>
+#include <concepts>
+#include <cstdint>
+#include <functional>
+#include <initializer_list>
#include <optional>
-#include <random>
-#include <thread>
+#include <type_traits>
+#include <utility>
#include <vector>
#include "iceberg/iceberg_export.h"
@@ -32,6 +34,27 @@
namespace iceberg {
+namespace detail {
+
+template <typename T>
+struct IsResult : std::false_type {};
+
+template <typename T>
+struct IsResult<Result<T>> : std::true_type {};
+
+template <typename T>
+concept ResultType = IsResult<std::remove_cvref_t<T>>::value;
+
+template <typename F>
+concept RetryTask = requires(F& f) {
+ { std::invoke(f) } -> ResultType;
+};
+
+template <typename F>
+using RetryTaskResult = std::remove_cvref_t<std::invoke_result_t<F&>>;
+
+} // namespace detail
+
/// \brief Configuration for retry behavior
struct ICEBERG_EXPORT RetryConfig {
/// Maximum number of retry attempts (not including the first attempt)
@@ -47,6 +70,9 @@ struct ICEBERG_EXPORT RetryConfig {
};
/// \brief Utility class for running tasks with retry logic
+///
+/// When retries are enabled (`num_retries > 0`), callers must explicitly
configure
+/// retry policy with `OnlyRetryOn(...)` or `StopRetryOn(...)`.
class ICEBERG_EXPORT RetryRunner {
public:
/// \brief Construct a RetryRunner with the given configuration
@@ -60,7 +86,8 @@ class ICEBERG_EXPORT RetryRunner {
/// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
/// StopRetryOn is ignored.
RetryRunner& OnlyRetryOn(std::initializer_list<ErrorKind> error_kinds) {
- only_retry_on_ = std::vector<ErrorKind>(error_kinds);
+ retry_policy_mode_ = RetryPolicyMode::kOnlyRetryOn;
+ retry_error_kinds_ = std::vector<ErrorKind>(error_kinds);
return *this;
}
@@ -68,10 +95,7 @@ class ICEBERG_EXPORT RetryRunner {
///
/// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
/// StopRetryOn is ignored.
- RetryRunner& OnlyRetryOn(ErrorKind error_kind) {
- only_retry_on_ = std::vector<ErrorKind>{error_kind};
- return *this;
- }
+ RetryRunner& OnlyRetryOn(ErrorKind error_kind) { return
OnlyRetryOn({error_kind}); }
/// \brief Specify error types that should stop retries immediately.
///
@@ -81,18 +105,35 @@ class ICEBERG_EXPORT RetryRunner {
/// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
/// StopRetryOn is ignored.
RetryRunner& StopRetryOn(std::initializer_list<ErrorKind> error_kinds) {
- stop_retry_on_ = std::vector<ErrorKind>(error_kinds);
+ if (retry_policy_mode_ == RetryPolicyMode::kOnlyRetryOn) {
+ return *this;
+ }
+
+ retry_policy_mode_ = RetryPolicyMode::kStopRetryOn;
+ retry_error_kinds_ = std::vector<ErrorKind>(error_kinds);
return *this;
}
+ /// \brief Specify a single error type that should stop retries immediately.
+ ///
+ /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
+ /// StopRetryOn is ignored.
+ RetryRunner& StopRetryOn(ErrorKind error_kind) { return
StopRetryOn({error_kind}); }
+
/// \brief Run a task that returns a Result<T>
///
+ /// When `num_retries > 0`, the retry policy must be configured explicitly
via
+ /// `OnlyRetryOn(...)` or `StopRetryOn(...)`.
+ ///
/// TODO: Replace attempt_counter with a metrics reporter once it is
available.
- template <typename F, typename T = typename
std::invoke_result_t<F>::value_type>
- Result<T> Run(F&& task, int32_t* attempt_counter = nullptr) {
- if (config_.num_retries < 0) {
- return InvalidArgument("num_retries must be non-negative, got {}",
- config_.num_retries);
+ template <typename F>
+ requires detail::RetryTask<F>
+ auto Run(F&& task, int32_t* attempt_counter = nullptr) ->
detail::RetryTaskResult<F> {
+ using TaskResult = detail::RetryTaskResult<F>;
+
+ const auto validation = ValidateConfig();
+ if (!validation.has_value()) {
+ return TaskResult(std::unexpected(validation.error()));
}
const auto deadline = ComputeDeadline();
@@ -105,7 +146,7 @@ class ICEBERG_EXPORT RetryRunner {
*attempt_counter = attempt;
}
- auto result = task();
+ auto result = std::invoke(task);
if (result.has_value()) {
return result;
}
@@ -121,89 +162,38 @@ class ICEBERG_EXPORT RetryRunner {
}
private:
+ enum class RetryPolicyMode {
+ // No retry policy was selected; invalid when retries are enabled.
+ kUnset,
+ // Retry only errors listed in retry_error_kinds_.
+ kOnlyRetryOn,
+ // Retry all errors except those listed in retry_error_kinds_.
+ kStopRetryOn,
+ };
+
using Clock = std::chrono::steady_clock;
using Duration = std::chrono::milliseconds;
using TimePoint = Clock::time_point;
- std::optional<TimePoint> ComputeDeadline() const {
- if (config_.total_timeout_ms <= 0) {
- return std::nullopt;
- }
- return Clock::now() + Duration(config_.total_timeout_ms);
- }
-
- bool HasTimedOut(const std::optional<TimePoint>& deadline) const {
- return deadline.has_value() && Clock::now() >= *deadline;
- }
+ /// \brief Validate retry counts, timing bounds, and the selected retry
policy.
+ Status ValidateConfig() const;
+ std::optional<TimePoint> ComputeDeadline() const;
+ bool HasTimedOut(const std::optional<TimePoint>& deadline) const;
/// \brief Check if the given error kind should trigger a retry.
- bool ShouldRetry(ErrorKind kind) const {
- if (!only_retry_on_.empty()) {
- return std::ranges::any_of(only_retry_on_,
- [kind](ErrorKind k) { return kind == k; });
- }
-
- if (!stop_retry_on_.empty()) {
- return !std::ranges::any_of(stop_retry_on_,
- [kind](ErrorKind k) { return kind == k; });
- }
-
- return true;
- }
-
+ bool ShouldRetry(ErrorKind kind) const;
bool CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts,
- const std::optional<TimePoint>& deadline) const {
- return attempt < max_attempts && !HasTimedOut(deadline) &&
ShouldRetry(kind);
- }
-
+ const std::optional<TimePoint>& deadline) const;
std::optional<Duration> RetryDelayWithinBudget(
- int32_t attempt, const std::optional<TimePoint>& deadline) const {
- const auto delay = Duration(CalculateDelay(attempt));
- if (!deadline.has_value()) {
- return delay;
- }
-
- const auto now = Clock::now();
- if (now >= *deadline) {
- return std::nullopt;
- }
-
- const auto remaining = std::chrono::duration_cast<Duration>(*deadline -
now);
- if (remaining <= Duration::zero() || delay >= remaining) {
- return std::nullopt;
- }
-
- return delay;
- }
-
+ int32_t attempt, const std::optional<TimePoint>& deadline) const;
bool WaitForNextAttempt(int32_t attempt,
- const std::optional<TimePoint>& deadline) const {
- const auto delay = RetryDelayWithinBudget(attempt, deadline);
- if (!delay.has_value()) {
- return false;
- }
-
- std::this_thread::sleep_for(*delay);
- return !HasTimedOut(deadline);
- }
-
+ const std::optional<TimePoint>& deadline) const;
/// \brief Calculate delay with exponential backoff and jitter
- int32_t CalculateDelay(int32_t attempt) const {
- // Calculate base delay with exponential backoff
- double base_delay = config_.min_wait_ms * std::pow(config_.scale_factor,
attempt - 1);
- int32_t delay_ms = static_cast<int32_t>(
- std::min(base_delay, static_cast<double>(config_.max_wait_ms)));
-
- static thread_local std::mt19937 gen(std::random_device{}());
- int32_t jitter_range = std::max(1, delay_ms / 10);
- std::uniform_int_distribution<> dis(0, jitter_range - 1);
- delay_ms += dis(gen);
- return std::max(1, delay_ms);
- }
+ int32_t CalculateDelay(int32_t attempt) const;
RetryConfig config_;
- std::vector<ErrorKind> only_retry_on_;
- std::vector<ErrorKind> stop_retry_on_;
+ RetryPolicyMode retry_policy_mode_ = RetryPolicyMode::kUnset;
+ std::vector<ErrorKind> retry_error_kinds_;
};
/// \brief Helper function to create a RetryRunner with table commit
configuration
diff --git a/src/iceberg/util/retry_util_internal.h
b/src/iceberg/util/retry_util_internal.h
new file mode 100644
index 00000000..f52dc873
--- /dev/null
+++ b/src/iceberg/util/retry_util_internal.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <cstdint>
+#include <functional>
+
+#include "iceberg/iceberg_export.h"
+
+namespace iceberg {
+
+struct RetryTestHooks {
+ using Clock = std::chrono::steady_clock;
+ using Duration = std::chrono::milliseconds;
+ using TimePoint = Clock::time_point;
+
+ std::function<TimePoint()> now;
+ std::function<void(Duration)> sleep_for;
+ std::function<int32_t(int32_t)> jitter;
+};
+
+ICEBERG_EXPORT const RetryTestHooks* GetActiveRetryTestHooks();
+ICEBERG_EXPORT void SetActiveRetryTestHooks(const RetryTestHooks* hooks);
+
+class ScopedRetryTestHooks {
+ public:
+ explicit ScopedRetryTestHooks(const RetryTestHooks& hooks)
+ : previous_hooks_(GetActiveRetryTestHooks()) {
+ SetActiveRetryTestHooks(&hooks);
+ }
+
+ ScopedRetryTestHooks(const ScopedRetryTestHooks&) = delete;
+ ScopedRetryTestHooks& operator=(const ScopedRetryTestHooks&) = delete;
+ ScopedRetryTestHooks(ScopedRetryTestHooks&&) = delete;
+ ScopedRetryTestHooks& operator=(ScopedRetryTestHooks&&) = delete;
+
+ ~ScopedRetryTestHooks() { SetActiveRetryTestHooks(previous_hooks_); }
+
+ private:
+ const RetryTestHooks* previous_hooks_;
+};
+
+} // namespace iceberg