This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5647-3242ac9e3f29d5ceac95443dde39a3aa81bfe3dd
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 263093aaa54b34996013c717e4bdf7f8902230d0
Author: Matthew B. <[email protected]>
AuthorDate: Sun Jun 14 20:25:46 2026 -0700

    fix(file-service): retry LakeFS health check on startup (#5647)
    
    ### What changes were proposed in this PR?
    - `LakeFSStorageClient.healthCheck()` now retries the LakeFS health
    check with **exponential backoff** before failing, so a transient
    `Connection reset` during concurrent startup no longer crashes the
    file-service.
    - Retry policy: 5 attempts with the delay doubling each time, starting
    at 200ms (200, 400, 800, 1600ms), capping total wait at ~3s.
    - The retry logic is factored into a small, reusable `retryWithBackoff`
    helper with an injectable `sleep` function so the backoff can be
    unit-tested without real waiting.
    - Each failed attempt logs a warning; if LakeFS is still unreachable
    after all attempts, startup fails with a clear aggregated error
    (preserving the last exception as the cause) so a genuine outage is
    still surfaced.
    - An `InterruptedException` while waiting restores the thread's
    interrupt status and fails fast instead of retrying.
    - Added `LakeFSStorageClientSpec` covering immediate success,
    retry-then-success with doubling delays, give-up-after-max-attempts
    (with cause preserved), and interrupt handling.
    
    ### Any related issues, documentation, discussions?
    Closes: #5646
    
    ### How was this PR tested?
    - Unit tests: `LakeFSStorageClientSpec` exercises the backoff behavior
    (delay sequence, max-attempt failure, interrupt handling) using an
    injected `sleep`.
    - Reproduce the race: stop LakeFS, start file-service, then start LakeFS
    within a few seconds; confirm file-service logs retry warnings and then
    starts successfully instead of exiting.
    - Regression with LakeFS already up: start file-service, confirm it
    binds `:9092` and `curl http://127.0.0.1:9092/api/healthcheck` returns
    HTTP 200.
    - Compile: run `sbt "FileService/compile"` and expect `[success]`.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-authored with Claude Opus 4.8 in compliance with ASF
---
 .../core/storage/util/LakeFSStorageClient.scala    | 55 ++++++++++++++--
 .../storage/util/LakeFSStorageClientSpec.scala     | 73 ++++++++++++++++++++++
 2 files changed, 123 insertions(+), 5 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
index cb66ea3e4f..b6aba648d7 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
@@ -19,6 +19,7 @@
 
 package org.apache.texera.amber.core.storage.util
 
+import com.typesafe.scalalogging.LazyLogging
 import io.lakefs.clients.sdk._
 import io.lakefs.clients.sdk.model.ResetCreation.TypeEnum
 import io.lakefs.clients.sdk.model._
@@ -33,11 +34,16 @@ import scala.jdk.CollectionConverters._
   * LakeFSFileStorage provides high-level file storage operations using LakeFS,
   * similar to Git operations for version control and file management.
   */
-object LakeFSStorageClient {
+object LakeFSStorageClient extends LazyLogging {
 
   // Maximum number of results per LakeFS API request (pagination page size)
   private val PageSize = 1000
 
+  // Health-check retry settings: retry with exponential backoff before giving 
up.
+  // 5 attempts starting at 200ms (200, 400, 800, 1600ms) caps total wait at 
~3s.
+  private val HealthCheckMaxAttempts = 5
+  private val HealthCheckInitialDelayMillis = 200L
+
   private lazy val apiClient: ApiClient = {
     val client = new ApiClient()
     client.setApiKey(StorageConfig.lakefsPassword)
@@ -69,11 +75,50 @@ object LakeFSStorageClient {
   private val branchName: String = "main"
 
   def healthCheck(): Unit = {
-    try {
+    retryWithBackoff(HealthCheckMaxAttempts, HealthCheckInitialDelayMillis) {
       this.healthCheckApi.healthCheck().execute()
-    } catch {
-      case e: Exception =>
-        throw new RuntimeException(s"Failed to connect to lake fs server: 
${e.getMessage}")
+    }
+  }
+
+  /**
+    * Runs `operation`, retrying on failure with exponential backoff (the delay
+    * doubles after each failed attempt) until it succeeds or `maxAttempts` is
+    * reached. The final failure is rethrown with the last exception as its 
cause.
+    * If interrupted while waiting, restores the interrupt status and fails 
fast.
+    *
+    * `sleep` is injectable so the backoff can be exercised in tests without 
real waiting.
+    */
+  private[util] def retryWithBackoff(
+      maxAttempts: Int,
+      initialDelayMillis: Long,
+      sleep: Long => Unit = Thread.sleep
+  )(operation: => Unit): Unit = {
+    var attempt = 1
+    var delayMillis = initialDelayMillis
+    while (true) {
+      try {
+        operation
+        return
+      } catch {
+        case ie: InterruptedException =>
+          // Restore the interrupt status and fail fast rather than retrying.
+          Thread.currentThread().interrupt()
+          throw new RuntimeException("Interrupted while waiting to retry lake 
fs health check", ie)
+        case e: Exception =>
+          if (attempt >= maxAttempts) {
+            throw new RuntimeException(
+              s"Failed to connect to lake fs server after $maxAttempts 
attempts: ${e.getMessage}",
+              e
+            )
+          }
+          logger.warn(
+            s"LakeFS not reachable (attempt $attempt/$maxAttempts): 
${e.getMessage}. " +
+              s"Retrying in ${delayMillis}ms..."
+          )
+          sleep(delayMillis)
+          attempt += 1
+          delayMillis *= 2
+      }
     }
   }
 
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala
new file mode 100644
index 0000000000..56506cee1f
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.storage.util
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable.ListBuffer
+
+class LakeFSStorageClientSpec extends AnyFlatSpec {
+
+  "retryWithBackoff" should "run the operation once and not sleep when it 
succeeds immediately" in {
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    LakeFSStorageClient.retryWithBackoff(5, 200L, delays += _) {
+      attempts += 1
+    }
+    assert(attempts == 1)
+    assert(delays.isEmpty)
+  }
+
+  it should "retry until success and double the delay after each failed 
attempt" in {
+    var attempts = 0
+    val delays = ListBuffer.empty[Long]
+    LakeFSStorageClient.retryWithBackoff(5, 200L, delays += _) {
+      attempts += 1
+      if (attempts < 3) throw new RuntimeException("transient")
+    }
+    assert(attempts == 3)
+    assert(delays.toList == List(200L, 400L))
+  }
+
+  it should "give up after maxAttempts and preserve the last failure as the 
cause" in {
+    var attempts = 0
+    val cause = new RuntimeException("still down")
+    val ex = intercept[RuntimeException] {
+      LakeFSStorageClient.retryWithBackoff(3, 200L, _ => ()) {
+        attempts += 1
+        throw cause
+      }
+    }
+    assert(attempts == 3)
+    assert(ex.getMessage.contains("after 3 attempts"))
+    assert(ex.getCause eq cause)
+  }
+
+  it should "fail fast and restore the interrupt status when interrupted" in {
+    val ex = intercept[RuntimeException] {
+      LakeFSStorageClient.retryWithBackoff(5, 200L, _ => ()) {
+        throw new InterruptedException("interrupted")
+      }
+    }
+    // Thread.interrupted() both reads and clears the flag, so the interrupt 
was restored.
+    assert(Thread.interrupted())
+    assert(ex.getCause.isInstanceOf[InterruptedException])
+  }
+}

Reply via email to