This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/release/v1.2 by this push:
     new 9387e15b60 fix(file-service): retry S3 bucket creation on slow startup 
(#5853)
9387e15b60 is described below

commit 9387e15b6048ae41f9a229405e1299763409c4c6
Author: Matthew B. <[email protected]>
AuthorDate: Mon Jun 22 17:26:25 2026 +0000

    fix(file-service): retry S3 bucket creation on slow startup (#5853)
    
    ### What changes were proposed in this PR?
    - Add `awaitDependency` to `FileService`, an exponential-backoff retry
    (6 attempts from 200ms, ~6s total) with an injectable sleep, mirroring
    `LakeFSStorageClient.retryWithBackoff`.
    - Wrap the two `S3StorageClient.createBucketIfNotExist` calls in
    `FileService.run` with it, so a slow-to-start MinIO/S3 no longer aborts
    file-service startup.
    - Handle `InterruptedException` consistently: an interrupt arriving
    during the backoff `sleep` (not just during the bucket operation) now
    restores the thread's interrupt status and fails fast, instead of
    escaping as a raw `InterruptedException` with the interrupt flag lost.
    - Leave `LakeFSStorageClient.healthCheck()` on its existing inner retry
    (unchanged).
    - Add `FileServiceSpec` (8 tests) covering immediate success,
    default-argument success, retry-then-success, the full backoff
    progression to give-up, give-up preserving the cause, `maxAttempts ==
    1`, and interrupt-fails-fast for both interrupt points.
    
    ### Any related issues, documentation, discussions?
    Closes: #5852
    
    Note: `awaitDependency` is a near-duplicate of
    `LakeFSStorageClient.retryWithBackoff` in `common/workflow-core`.
    Extracting a single shared helper that both delegate to is the cleaner
    end state, but it would refactor a stable, separately-tested class in
    another module, so it is deferred to a follow-up rather than widening
    the scope of this startup-race fix.
    
    ### How was this PR tested?
    - Run `sbt "FileService/testOnly
    org.apache.texera.service.FileServiceSpec"` and expect 8 passing tests:
      - immediate success runs the operation once and never sleeps;
    - default-argument success returns on the first try without invoking the
    default `Thread.sleep` backoff;
    - retry-then-success records delays `List(200, 400)` before succeeding
    on the 3rd try;
    - exhausting all 6 attempts records the full progression `List(200, 400,
    800, 1600, 3200)` before giving up;
    - give-up rethrows after `maxAttempts` with the original exception as
    `getCause` and the dependency name in the message;
      - `maxAttempts == 1` gives up after a single attempt without sleeping;
    - an interrupt while running the operation restores the interrupt flag
    and fails fast;
    - an interrupt while sleeping between attempts likewise restores the
    interrupt flag and fails fast.
    - This environment hits a pre-existing JaCoCo instrumentation error
    (`Unsupported class file major version 69`) because JaCoCo 0.8.11 cannot
    instrument JDK 25 class files; this is unrelated to the change. The spec
    was verified locally against a JDK 17 toolchain (`sbt -java-home
    <jdk17>`, 8/8 pass) and relies on CI's JDK/JaCoCo combo for the standard
    instrumented run. `scalafmtCheck` is clean for both main and test
    sources.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-authored with Claude Opus 4.8 in compliance with ASF
    
    (backported from commit 18e4e67c70fcc4d4c739338d9c30ce552db3decc)
---
 .../org/apache/texera/service/FileService.scala    |  56 +++++-
 .../apache/texera/service/FileServiceSpec.scala    | 189 +++++++++++++++++++++
 2 files changed, 243 insertions(+), 2 deletions(-)

diff --git 
a/file-service/src/main/scala/org/apache/texera/service/FileService.scala 
b/file-service/src/main/scala/org/apache/texera/service/FileService.scala
index 88f7650378..e75cb42a2c 100644
--- a/file-service/src/main/scala/org/apache/texera/service/FileService.scala
+++ b/file-service/src/main/scala/org/apache/texera/service/FileService.scala
@@ -76,9 +76,13 @@ class FileService extends 
Application[FileServiceConfiguration] with LazyLogging
     )
 
     // check if the texera dataset bucket exists, if not create it
-    S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName)
+    awaitDependency("texera dataset bucket") {
+      S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName)
+    }
     // ensure the large-binary S3 bucket exists before any workflow execution 
attempts to use it
-    S3StorageClient.createBucketIfNotExist(LargeBinaryManager.DEFAULT_BUCKET)
+    awaitDependency("large-binary bucket") {
+      S3StorageClient.createBucketIfNotExist(LargeBinaryManager.DEFAULT_BUCKET)
+    }
     // check if we can connect to the lakeFS service
     LakeFSStorageClient.healthCheck()
 
@@ -105,6 +109,54 @@ class FileService extends 
Application[FileServiceConfiguration] with LazyLogging
     // Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL
     RequestLoggingFilter.register(environment.getApplicationContext)
   }
+
+  /**
+    * Runs `operation`, retrying with exponential backoff until it succeeds or 
`maxAttempts` is
+    * reached, to tolerate a slow-to-start object store. The last failure is 
rethrown as the cause.
+    * `sleep` is injectable for tests. Defaults: 6 attempts from 200ms (200, 
400, 800, 1600, 3200), ~6s.
+    */
+  private[service] def awaitDependency(
+      description: String,
+      maxAttempts: Int = 6,
+      initialDelayMillis: Long = 200L,
+      sleep: Long => Unit = Thread.sleep
+  )(operation: => Unit): Unit = {
+    // Restore the interrupt status and fail fast rather than retrying, 
whether the
+    // interrupt arrives while running `operation` or while sleeping between 
attempts.
+    def failInterrupted(ie: InterruptedException): Nothing = {
+      Thread.currentThread().interrupt()
+      throw new RuntimeException(s"Interrupted while waiting for 
$description", ie)
+    }
+
+    var attempt = 1
+    var delayMillis = initialDelayMillis
+    while (true) {
+      try {
+        operation
+        return
+      } catch {
+        case ie: InterruptedException => failInterrupted(ie)
+        case e: Exception =>
+          if (attempt >= maxAttempts) {
+            throw new RuntimeException(
+              s"$description not ready after $maxAttempts attempts: 
${e.getMessage}",
+              e
+            )
+          }
+          logger.warn(
+            s"$description not ready (attempt $attempt/$maxAttempts): 
${e.getMessage}. " +
+              s"Retrying in ${delayMillis}ms..."
+          )
+          try {
+            sleep(delayMillis)
+          } catch {
+            case ie: InterruptedException => failInterrupted(ie)
+          }
+          attempt += 1
+          delayMillis *= 2
+      }
+    }
+  }
 }
 
 object FileService {
diff --git 
a/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala 
b/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala
new file mode 100644
index 0000000000..30ec2c23f7
--- /dev/null
+++ 
b/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable.ListBuffer
+
+class FileServiceSpec extends AnyFlatSpec {
+
+  private val service = new FileService()
+
+  "awaitDependency" should "run the operation once and not sleep when it 
succeeds immediately" in {
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    service.awaitDependency("dep", 6, 200L, delays += _) {
+      attempts += 1
+    }
+    assert(attempts == 1)
+    assert(delays.isEmpty)
+  }
+
+  it should "run the operation once with the default arguments when it 
succeeds immediately" in {
+    // Exercises the default maxAttempts/initialDelay/sleep parameters: a 
first-try success
+    // returns without ever invoking the (real Thread.sleep) default backoff.
+    var attempts = 0
+    service.awaitDependency("dep") {
+      attempts += 1
+    }
+    assert(attempts == 1)
+  }
+
+  it should "retry until success and double the delay after each failed 
attempt" in {
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    service.awaitDependency("dep", 6, 200L, delays += _) {
+      attempts += 1
+      if (attempts < 3) throw new RuntimeException("not reachable yet")
+    }
+    assert(attempts == 3)
+    assert(delays.toList == List(200L, 400L))
+  }
+
+  it should "double the delay after every failed attempt up to maxAttempts - 1 
sleeps" in {
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    val ex = intercept[RuntimeException] {
+      service.awaitDependency("dep", 6, 200L, delays += _) {
+        attempts += 1
+        throw new RuntimeException("down")
+      }
+    }
+    // 6 attempts means 5 backoff waits following the geometric progression 
from 200ms.
+    assert(attempts == 6)
+    assert(delays.toList == List(200L, 400L, 800L, 1600L, 3200L))
+    assert(ex.getMessage.contains("after 6 attempts"))
+  }
+
+  it should "give up after maxAttempts and preserve the last failure as the 
cause" in {
+    var attempts = 0
+    val cause = new RuntimeException("still down")
+    val ex = intercept[RuntimeException] {
+      service.awaitDependency("dep", 3, 200L, _ => ()) {
+        attempts += 1
+        throw cause
+      }
+    }
+    assert(attempts == 3)
+    assert(ex.getMessage.contains("after 3 attempts"))
+    assert(ex.getMessage.contains("dep"))
+    assert(ex.getCause eq cause)
+  }
+
+  it should "give up immediately without sleeping when maxAttempts is 1" in {
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    val cause = new RuntimeException("still down")
+    val ex = intercept[RuntimeException] {
+      service.awaitDependency("dep", 1, 200L, delays += _) {
+        attempts += 1
+        throw cause
+      }
+    }
+    assert(attempts == 1)
+    assert(delays.isEmpty)
+    assert(ex.getMessage.contains("after 1 attempts"))
+    assert(ex.getCause eq cause)
+  }
+
+  it should "fail fast and restore the interrupt status when the operation is 
interrupted" in {
+    val ex = intercept[RuntimeException] {
+      service.awaitDependency("dep", 6, 200L, _ => ()) {
+        throw new InterruptedException("interrupted")
+      }
+    }
+    // Thread.interrupted() both reads and clears the flag, so the interrupt 
was restored.
+    assert(Thread.interrupted())
+    assert(ex.getMessage.contains("Interrupted while waiting for dep"))
+    assert(ex.getCause.isInstanceOf[InterruptedException])
+  }
+
+  it should "fail fast and restore the interrupt status when interrupted while 
sleeping between attempts" in {
+    var attempts = 0
+    val ex = intercept[RuntimeException] {
+      service.awaitDependency("dep", 6, 200L, _ => throw new 
InterruptedException("interrupted")) {
+        attempts += 1
+        throw new RuntimeException("not reachable yet")
+      }
+    }
+    // The operation failed once, then the interrupt arrived during the 
backoff sleep.
+    assert(attempts == 1)
+    // Thread.interrupted() both reads and clears the flag, so the interrupt 
was restored.
+    assert(Thread.interrupted())
+    assert(ex.getMessage.contains("Interrupted while waiting for dep"))
+    assert(ex.getCause.isInstanceOf[InterruptedException])
+  }
+
+  it should "succeed on the final allowed attempt without giving up one try 
too early" in {
+    // Boundary for `attempt >= maxAttempts`: the operation only succeeds on 
the very last
+    // attempt, so the loop must not give up prematurely. Expect maxAttempts - 
1 backoff waits.
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    service.awaitDependency("dep", 3, 200L, delays += _) {
+      attempts += 1
+      if (attempts < 3) throw new RuntimeException("not reachable yet")
+    }
+    assert(attempts == 3)
+    assert(delays.toList == List(200L, 400L))
+  }
+
+  it should "honor a custom initial delay when computing the backoff 
progression" in {
+    // Guards against the initial delay being hardcoded: starting from 50ms 
the geometric
+    // progression must be 50, 100, 200 rather than the default 200-based 
sequence.
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    val ex = intercept[RuntimeException] {
+      service.awaitDependency("dep", 4, 50L, delays += _) {
+        attempts += 1
+        throw new RuntimeException("down")
+      }
+    }
+    assert(attempts == 4)
+    assert(delays.toList == List(50L, 100L, 200L))
+    assert(ex.getMessage.contains("after 4 attempts"))
+  }
+
+  it should "include the underlying failure message when giving up" in {
+    val ex = intercept[RuntimeException] {
+      service.awaitDependency("dataset bucket", 2, 200L, _ => ()) {
+        throw new RuntimeException("connection refused")
+      }
+    }
+    assert(ex.getMessage.contains("dataset bucket not ready after 2 attempts"))
+    assert(ex.getMessage.contains("connection refused"))
+  }
+
+  it should "propagate a non-Exception Throwable immediately without retrying 
or wrapping it" in {
+    // The catch clause only matches Exception, so an Error must escape on the 
first attempt:
+    // it is neither retried nor wrapped in the \"not ready after N attempts\" 
RuntimeException.
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    val err = intercept[StackOverflowError] {
+      service.awaitDependency("dep", 6, 200L, delays += _) {
+        attempts += 1
+        throw new StackOverflowError("boom")
+      }
+    }
+    assert(attempts == 1)
+    assert(delays.isEmpty)
+    assert(err.getMessage == "boom")
+  }
+}

Reply via email to