This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 18e4e67c70 fix(file-service): retry S3 bucket creation on slow startup
(#5853)
18e4e67c70 is described below
commit 18e4e67c70fcc4d4c739338d9c30ce552db3decc
Author: Matthew B. <[email protected]>
AuthorDate: Mon Jun 22 10:13:37 2026 -0700
fix(file-service): retry S3 bucket creation on slow startup (#5853)
### What changes were proposed in this PR?
- Add `awaitDependency` to `FileService`, an exponential-backoff retry
(6 attempts from 200ms, ~6s total) with an injectable sleep, mirroring
`LakeFSStorageClient.retryWithBackoff`.
- Wrap the two `S3StorageClient.createBucketIfNotExist` calls in
`FileService.run` with it, so a slow-to-start MinIO/S3 no longer aborts
file-service startup.
- Handle `InterruptedException` consistently: an interrupt arriving
during the backoff `sleep` (not just during the bucket operation) now
restores the thread's interrupt status and fails fast, instead of
escaping as a raw `InterruptedException` with the interrupt flag lost.
- Leave `LakeFSStorageClient.healthCheck()` on its existing inner retry
(unchanged).
- Add `FileServiceSpec` (8 tests) covering immediate success,
default-argument success, retry-then-success, the full backoff
progression to give-up, give-up preserving the cause, `maxAttempts ==
1`, and interrupt-fails-fast for both interrupt points.
### Any related issues, documentation, discussions?
Closes: #5852
Note: `awaitDependency` is a near-duplicate of
`LakeFSStorageClient.retryWithBackoff` in `common/workflow-core`.
Extracting a single shared helper that both delegate to is the cleaner
end state, but it would refactor a stable, separately-tested class in
another module, so it is deferred to a follow-up rather than widening
the scope of this startup-race fix.
### How was this PR tested?
- Run `sbt "FileService/testOnly
org.apache.texera.service.FileServiceSpec"` and expect 8 passing tests:
- immediate success runs the operation once and never sleeps;
- default-argument success returns on the first try without invoking the
default `Thread.sleep` backoff;
- retry-then-success records delays `List(200, 400)` before succeeding
on the 3rd try;
- exhausting all 6 attempts records the full progression `List(200, 400,
800, 1600, 3200)` before giving up;
- give-up rethrows after `maxAttempts` with the original exception as
`getCause` and the dependency name in the message;
- `maxAttempts == 1` gives up after a single attempt without sleeping;
- an interrupt while running the operation restores the interrupt flag
and fails fast;
- an interrupt while sleeping between attempts likewise restores the
interrupt flag and fails fast.
- This environment hits a pre-existing JaCoCo instrumentation error
(`Unsupported class file major version 69`) because JaCoCo 0.8.11 cannot
instrument JDK 25 class files; this is unrelated to the change. The spec
was verified locally against a JDK 17 toolchain (`sbt -java-home
<jdk17>`, 8/8 pass) and relies on CI's JDK/JaCoCo combo for the standard
instrumented run. `scalafmtCheck` is clean for both main and test
sources.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF
---
.../org/apache/texera/service/FileService.scala | 56 +++++-
.../apache/texera/service/FileServiceSpec.scala | 189 +++++++++++++++++++++
2 files changed, 243 insertions(+), 2 deletions(-)
diff --git
a/file-service/src/main/scala/org/apache/texera/service/FileService.scala
b/file-service/src/main/scala/org/apache/texera/service/FileService.scala
index 76d78dfef8..9a2688212d 100644
--- a/file-service/src/main/scala/org/apache/texera/service/FileService.scala
+++ b/file-service/src/main/scala/org/apache/texera/service/FileService.scala
@@ -76,9 +76,13 @@ class FileService extends
Application[FileServiceConfiguration] with LazyLogging
)
// check if the texera dataset bucket exists, if not create it
- S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName)
+ awaitDependency("texera dataset bucket") {
+ S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName)
+ }
// ensure the large-binary S3 bucket exists before any workflow execution
attempts to use it
- S3StorageClient.createBucketIfNotExist(LargeBinaryManager.DEFAULT_BUCKET)
+ awaitDependency("large-binary bucket") {
+ S3StorageClient.createBucketIfNotExist(LargeBinaryManager.DEFAULT_BUCKET)
+ }
// check if we can connect to the lakeFS service
LakeFSStorageClient.healthCheck()
@@ -105,6 +109,54 @@ class FileService extends
Application[FileServiceConfiguration] with LazyLogging
// Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL
RequestLoggingFilter.register(environment.getApplicationContext)
}
+
+ /**
+ * Runs `operation`, retrying with exponential backoff until it succeeds or
`maxAttempts` is
+ * reached, to tolerate a slow-to-start object store. The last failure is
rethrown as the cause.
+ * `sleep` is injectable for tests. Defaults: 6 attempts from 200ms (200,
400, 800, 1600, 3200), ~6s.
+ */
+ private[service] def awaitDependency(
+ description: String,
+ maxAttempts: Int = 6,
+ initialDelayMillis: Long = 200L,
+ sleep: Long => Unit = Thread.sleep
+ )(operation: => Unit): Unit = {
+ // Restore the interrupt status and fail fast rather than retrying,
whether the
+ // interrupt arrives while running `operation` or while sleeping between
attempts.
+ def failInterrupted(ie: InterruptedException): Nothing = {
+ Thread.currentThread().interrupt()
+ throw new RuntimeException(s"Interrupted while waiting for
$description", ie)
+ }
+
+ var attempt = 1
+ var delayMillis = initialDelayMillis
+ while (true) {
+ try {
+ operation
+ return
+ } catch {
+ case ie: InterruptedException => failInterrupted(ie)
+ case e: Exception =>
+ if (attempt >= maxAttempts) {
+ throw new RuntimeException(
+ s"$description not ready after $maxAttempts attempts:
${e.getMessage}",
+ e
+ )
+ }
+ logger.warn(
+ s"$description not ready (attempt $attempt/$maxAttempts):
${e.getMessage}. " +
+ s"Retrying in ${delayMillis}ms..."
+ )
+ try {
+ sleep(delayMillis)
+ } catch {
+ case ie: InterruptedException => failInterrupted(ie)
+ }
+ attempt += 1
+ delayMillis *= 2
+ }
+ }
+ }
}
object FileService {
diff --git
a/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala
b/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala
new file mode 100644
index 0000000000..30ec2c23f7
--- /dev/null
+++
b/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable.ListBuffer
+
+class FileServiceSpec extends AnyFlatSpec {
+
+ private val service = new FileService()
+
+ "awaitDependency" should "run the operation once and not sleep when it
succeeds immediately" in {
+ var attempts = 0
+ val delays = ListBuffer.empty[Long]
+ service.awaitDependency("dep", 6, 200L, delays += _) {
+ attempts += 1
+ }
+ assert(attempts == 1)
+ assert(delays.isEmpty)
+ }
+
+ it should "run the operation once with the default arguments when it
succeeds immediately" in {
+ // Exercises the default maxAttempts/initialDelay/sleep parameters: a
first-try success
+ // returns without ever invoking the (real Thread.sleep) default backoff.
+ var attempts = 0
+ service.awaitDependency("dep") {
+ attempts += 1
+ }
+ assert(attempts == 1)
+ }
+
+ it should "retry until success and double the delay after each failed
attempt" in {
+ var attempts = 0
+ val delays = ListBuffer.empty[Long]
+ service.awaitDependency("dep", 6, 200L, delays += _) {
+ attempts += 1
+ if (attempts < 3) throw new RuntimeException("not reachable yet")
+ }
+ assert(attempts == 3)
+ assert(delays.toList == List(200L, 400L))
+ }
+
+ it should "double the delay after every failed attempt up to maxAttempts - 1
sleeps" in {
+ var attempts = 0
+ val delays = ListBuffer.empty[Long]
+ val ex = intercept[RuntimeException] {
+ service.awaitDependency("dep", 6, 200L, delays += _) {
+ attempts += 1
+ throw new RuntimeException("down")
+ }
+ }
+ // 6 attempts means 5 backoff waits following the geometric progression
from 200ms.
+ assert(attempts == 6)
+ assert(delays.toList == List(200L, 400L, 800L, 1600L, 3200L))
+ assert(ex.getMessage.contains("after 6 attempts"))
+ }
+
+ it should "give up after maxAttempts and preserve the last failure as the
cause" in {
+ var attempts = 0
+ val cause = new RuntimeException("still down")
+ val ex = intercept[RuntimeException] {
+ service.awaitDependency("dep", 3, 200L, _ => ()) {
+ attempts += 1
+ throw cause
+ }
+ }
+ assert(attempts == 3)
+ assert(ex.getMessage.contains("after 3 attempts"))
+ assert(ex.getMessage.contains("dep"))
+ assert(ex.getCause eq cause)
+ }
+
+ it should "give up immediately without sleeping when maxAttempts is 1" in {
+ var attempts = 0
+ val delays = ListBuffer.empty[Long]
+ val cause = new RuntimeException("still down")
+ val ex = intercept[RuntimeException] {
+ service.awaitDependency("dep", 1, 200L, delays += _) {
+ attempts += 1
+ throw cause
+ }
+ }
+ assert(attempts == 1)
+ assert(delays.isEmpty)
+ assert(ex.getMessage.contains("after 1 attempts"))
+ assert(ex.getCause eq cause)
+ }
+
+ it should "fail fast and restore the interrupt status when the operation is
interrupted" in {
+ val ex = intercept[RuntimeException] {
+ service.awaitDependency("dep", 6, 200L, _ => ()) {
+ throw new InterruptedException("interrupted")
+ }
+ }
+ // Thread.interrupted() both reads and clears the flag, so the interrupt
was restored.
+ assert(Thread.interrupted())
+ assert(ex.getMessage.contains("Interrupted while waiting for dep"))
+ assert(ex.getCause.isInstanceOf[InterruptedException])
+ }
+
+ it should "fail fast and restore the interrupt status when interrupted while
sleeping between attempts" in {
+ var attempts = 0
+ val ex = intercept[RuntimeException] {
+ service.awaitDependency("dep", 6, 200L, _ => throw new
InterruptedException("interrupted")) {
+ attempts += 1
+ throw new RuntimeException("not reachable yet")
+ }
+ }
+ // The operation failed once, then the interrupt arrived during the
backoff sleep.
+ assert(attempts == 1)
+ // Thread.interrupted() both reads and clears the flag, so the interrupt
was restored.
+ assert(Thread.interrupted())
+ assert(ex.getMessage.contains("Interrupted while waiting for dep"))
+ assert(ex.getCause.isInstanceOf[InterruptedException])
+ }
+
+ it should "succeed on the final allowed attempt without giving up one try
too early" in {
+ // Boundary for `attempt >= maxAttempts`: the operation only succeeds on
the very last
+ // attempt, so the loop must not give up prematurely. Expect maxAttempts -
1 backoff waits.
+ var attempts = 0
+ val delays = ListBuffer.empty[Long]
+ service.awaitDependency("dep", 3, 200L, delays += _) {
+ attempts += 1
+ if (attempts < 3) throw new RuntimeException("not reachable yet")
+ }
+ assert(attempts == 3)
+ assert(delays.toList == List(200L, 400L))
+ }
+
+ it should "honor a custom initial delay when computing the backoff
progression" in {
+ // Guards against the initial delay being hardcoded: starting from 50ms
the geometric
+ // progression must be 50, 100, 200 rather than the default 200-based
sequence.
+ var attempts = 0
+ val delays = ListBuffer.empty[Long]
+ val ex = intercept[RuntimeException] {
+ service.awaitDependency("dep", 4, 50L, delays += _) {
+ attempts += 1
+ throw new RuntimeException("down")
+ }
+ }
+ assert(attempts == 4)
+ assert(delays.toList == List(50L, 100L, 200L))
+ assert(ex.getMessage.contains("after 4 attempts"))
+ }
+
+ it should "include the underlying failure message when giving up" in {
+ val ex = intercept[RuntimeException] {
+ service.awaitDependency("dataset bucket", 2, 200L, _ => ()) {
+ throw new RuntimeException("connection refused")
+ }
+ }
+ assert(ex.getMessage.contains("dataset bucket not ready after 2 attempts"))
+ assert(ex.getMessage.contains("connection refused"))
+ }
+
+ it should "propagate a non-Exception Throwable immediately without retrying
or wrapping it" in {
+ // The catch clause only matches Exception, so an Error must escape on the
first attempt:
+ // it is neither retried nor wrapped in the \"not ready after N attempts\"
RuntimeException.
+ var attempts = 0
+ val delays = ListBuffer.empty[Long]
+ val err = intercept[StackOverflowError] {
+ service.awaitDependency("dep", 6, 200L, delays += _) {
+ attempts += 1
+ throw new StackOverflowError("boom")
+ }
+ }
+ assert(attempts == 1)
+ assert(delays.isEmpty)
+ assert(err.getMessage == "boom")
+ }
+}