This is an automated email from the ASF dual-hosted git repository.

singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 3452153ed2 Core: Replace Failsafe with Tasks utility in RESTTableScan 
(#15613)
3452153ed2 is described below

commit 3452153ed2e8e9ee897c1a9a76a3264df9b2855b
Author: Prashant Singh <[email protected]>
AuthorDate: Fri Mar 13 12:24:23 2026 -0700

    Core: Replace Failsafe with Tasks utility in RESTTableScan (#15613)
    
    Replace the Failsafe library dependency in 
RESTTableScan.fetchPlanningResult()
    with Iceberg's built-in Tasks utility for retry/backoff, aligning with 
codebase
    conventions and removing the failsafe dependency from iceberg-core.
    
    Co-authored-by: Prashant Singh <[email protected]>
---
 build.gradle                                       |   1 -
 .../org/apache/iceberg/rest/RESTTableScan.java     | 106 ++++++++-------------
 2 files changed, 40 insertions(+), 67 deletions(-)

diff --git a/build.gradle b/build.gradle
index 765a7fe820..52d25bc33b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -379,7 +379,6 @@ project(':iceberg-core') {
     implementation libs.jackson.databind
     implementation libs.caffeine
     implementation libs.roaringbitmap
-    implementation libs.failsafe
     compileOnly(libs.hadoop3.client) {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index f8860c42d7..f533f2c87f 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -21,13 +21,10 @@ package org.apache.iceberg.rest;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.RemovalListener;
-import dev.failsafe.Failsafe;
-import dev.failsafe.FailsafeException;
-import dev.failsafe.RetryPolicy;
-import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
@@ -50,6 +47,7 @@ import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
 import org.apache.iceberg.rest.responses.PlanTableScanResponse;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -228,71 +226,45 @@ class RESTTableScan extends DataTableScan {
   }
 
   private CloseableIterable<FileScanTask> fetchPlanningResult() {
-    RetryPolicy<FetchPlanningResultResponse> retryPolicy =
-        RetryPolicy.<FetchPlanningResultResponse>builder()
-            .handleResultIf(response -> response.planStatus() == 
PlanStatus.SUBMITTED)
-            .withBackoff(
-                Duration.ofMillis(MIN_SLEEP_MS), 
Duration.ofMillis(MAX_SLEEP_MS), SCALE_FACTOR)
-            .withJitter(0.1) // Add jitter up to 10% of the calculated delay
-            .withMaxAttempts(MAX_ATTEMPTS)
-            .withMaxDuration(Duration.ofMillis(MAX_WAIT_TIME_MS))
-            .onFailedAttempt(
-                e -> {
-                  // Log when a retry occurs
-                  LOG.debug(
-                      "Plan {} still SUBMITTED (Attempt {}/{}). Previous 
attempt took {} ms.",
-                      planId,
-                      e.getAttemptCount(),
-                      MAX_ATTEMPTS,
-                      e.getElapsedAttemptTime().toMillis());
-                })
-            .onFailure(
-                e -> {
-                  LOG.warn(
-                      "Polling for plan {} failed due to: {}",
-                      planId,
-                      e.getException().getMessage());
-                  cleanupPlanResources();
-                })
-            .build();
+    AtomicReference<FetchPlanningResultResponse> result = new 
AtomicReference<>();
+    Tasks.foreach(planId)
+        .exponentialBackoff(MIN_SLEEP_MS, MAX_SLEEP_MS, MAX_WAIT_TIME_MS, 
SCALE_FACTOR)
+        .retry(MAX_ATTEMPTS)
+        .onlyRetryOn(NotCompleteException.class)
+        .onFailure(
+            (id, err) -> {
+              LOG.warn("Planning failed for plan ID: {}", id, err);
+              cleanupPlanResources();
+            })
+        .throwFailureWhenFinished()
+        .run(
+            id -> {
+              FetchPlanningResultResponse response =
+                  client.get(
+                      resourcePaths.plan(tableIdentifier, id),
+                      headers,
+                      FetchPlanningResultResponse.class,
+                      headers,
+                      ErrorHandlers.planErrorHandler(),
+                      parserContext);
 
-    try {
-      FetchPlanningResultResponse response =
-          Failsafe.with(retryPolicy)
-              .get(
-                  () ->
-                      client.get(
-                          resourcePaths.plan(tableIdentifier, planId),
-                          headers,
-                          FetchPlanningResultResponse.class,
-                          headers,
-                          ErrorHandlers.planErrorHandler(),
-                          parserContext));
-      Preconditions.checkState(
-          response.planStatus() == PlanStatus.COMPLETED,
-          "Plan finished with unexpected status %s for planId: %s",
-          response.planStatus(),
-          planId);
+              if (response.planStatus() == PlanStatus.SUBMITTED) {
+                throw new NotCompleteException();
+              } else if (response.planStatus() != PlanStatus.COMPLETED) {
+                throw new IllegalStateException(
+                    String.format(
+                        "Invalid planStatus: %s for planId: %s", 
response.planStatus(), id));
+              }
 
-      this.scanFileIO =
-          !response.credentials().isEmpty() ? 
scanFileIO(response.credentials()) : table().io();
+              result.set(response);
+            });
 
-      return scanTasksIterable(response.planTasks(), response.fileScanTasks());
-    } catch (FailsafeException e) {
-      // FailsafeException is thrown when retries are exhausted (Max 
Attempts/Duration)
-      // Cleanup is handled by the .onFailure() hook, so we just wrap and 
rethrow.
-      throw new IllegalStateException(
-          String.format("Polling timed out or exceeded max attempts for 
planId: %s.", planId), e);
-    } catch (Exception e) {
-      // Catch any immediate non-retryable exceptions (e.g., I/O errors, auth 
errors)
-      try {
-        cleanupPlanResources();
-      } catch (Exception cancelException) {
-        // Ignore cancellation failures during exception handling
-        e.addSuppressed(cancelException);
-      }
-      throw e;
-    }
+    FetchPlanningResultResponse response = result.get();
+
+    this.scanFileIO =
+        !response.credentials().isEmpty() ? scanFileIO(response.credentials()) 
: table().io();
+
+    return scanTasksIterable(response.planTasks(), response.fileScanTasks());
   }
 
   private CloseableIterable<FileScanTask> scanTasksIterable(
@@ -342,4 +314,6 @@ class RESTTableScan extends DataTableScan {
       return false;
     }
   }
+
+  private static class NotCompleteException extends RuntimeException {}
 }

Reply via email to