This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c099104  [SPARK-55905] Exponential backoff for 
getOrCreateSecondaryResource attempt
c099104 is described below

commit c0991045e5fcb1cc335c1652af2dbf38f5b36a7d
Author: Qi Tan <[email protected]>
AuthorDate: Thu Mar 12 13:10:43 2026 -0700

    [SPARK-55905] Exponential backoff for getOrCreateSecondaryResource attempt
    
    ### What changes were proposed in this pull request?
    [SPARK-55905] Exponential backoff for getOrCreateSecondaryResource attempt
    
    ### Why are the changes needed?
    Optimize Retry
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ### How was this patch tested?
    unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #542 from TQJADE/SPARK-55905.
    
    Lead-authored-by: Qi Tan <[email protected]>
    Co-authored-by: Qi Tan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 docs/config_properties.md                          |  6 +-
 .../org/apache/spark/k8s/operator/Constants.java   |  3 +
 .../k8s/operator/config/SparkOperatorConf.java     | 67 ++++++++++++++++-
 .../spark/k8s/operator/utils/BackoffUtils.java     | 75 +++++++++++++++++++
 .../spark/k8s/operator/utils/ReconcilerUtils.java  | 13 ++++
 .../reconciler/SparkAppReconcilerTest.java         | 85 ++++++++++++++++++++++
 6 files changed, 245 insertions(+), 4 deletions(-)

diff --git a/docs/config_properties.md b/docs/config_properties.md
index 2edeeda..f7040a2 100644
--- a/docs/config_properties.md
+++ b/docs/config_properties.md
@@ -4,7 +4,11 @@
  | --- | --- | --- | --- | --- | 
  | spark.kubernetes.operator.api.retryAttemptAfterSeconds | Long | 1 | false | 
Default time (in seconds) to wait till next request. This would be used if 
server does not set Retry-After in response. Setting this to non-positive 
number means immediate retry. | 
  | spark.kubernetes.operator.api.retryMaxAttempts | Integer | 15 | false | Max 
attempts of retries on unhandled controller errors. Setting this to 
non-positive value means no retry. | 
- | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3 
| false | Maximal number of retry attempts of requesting secondary resource for 
Spark application. This would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting 
reconcile on the same SparkApplication, as well as API server errors 
(408/500/502/503/504) and network-level timeouts. This should be positive 
number. | 
+ | spark.kubernetes.operator.api.secondaryResourceCreateBackoffJitterMillis | 
Long | 500 | false | Jitter (in milliseconds) added to backoff delay between 
retries when creating secondary resources for Spark application. | 
+ | spark.kubernetes.operator.api.secondaryResourceCreateBackoffMultiplier | 
Double | 2.0 | false | Backoff multiplier applied to each retry interval when 
creating secondary resources for Spark application. | 
+ | spark.kubernetes.operator.api.secondaryResourceCreateInitialBackoffMillis | 
Long | 1000 | false | Initial backoff (in milliseconds) between retries when 
creating secondary resources for Spark application. Backoff is applied only on 
409 (Conflict) and 429 (Too Many Requests) responses. | 
+ | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3 
| false | Maximal number of retry attempts of requesting secondary resource for 
Spark application. This would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting 
reconcile on the same SparkApplication, as well as API server errors 
(408/500/502/503/504) and network-level timeouts. Exponential backoff with 
jitter is applied before retrying on 409 (Confl [...]
+ | spark.kubernetes.operator.api.secondaryResourceCreateMaxBackoffMillis | 
Long | 40000 | false | Maximum backoff (in milliseconds) between retries when 
creating secondary resources for Spark application. | 
  | spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false | 
Maximal number of retry attempts of requests to k8s server for resource status 
update. This would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting 
update on the same SparkApplication. This should be positive number. | 
  | spark.kubernetes.operator.dynamicConfig.enabled | Boolean | false | false | 
When enabled, operator would use config map as source of truth for config 
property override. The config map need to be created in 
spark.kubernetes.operator.namespace, and labeled with operator name. | 
  | spark.kubernetes.operator.dynamicConfig.reconcilerParallelism | Integer | 1 
| false | Parallelism for dynamic config reconciler. Unbounded pool would be 
used if set to non-positive number. | 
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
index 898286e..1934946 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
@@ -225,4 +225,7 @@ public class Constants {
 
   /** Message indicating that the cluster status cannot be processed. */
   public static final String UNKNOWN_CLUSTER_STATE_MESSAGE = "Cannot process 
cluster status.";
+
+  /** HTTP 429 Too Many Requests (not defined in {@link 
java.net.HttpURLConnection}). */
+  public static final int HTTP_TOO_MANY_REQUESTS = 429;
 }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
index 9b8c54f..51b2710 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -381,7 +381,10 @@ public final class SparkOperatorConf {
    * Maximal number of retry attempts of requesting secondary resource for 
Spark application. This
    * would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome
    * potential conflicting reconcile on the same SparkApplication, as well as 
transient API server
-   * errors and network-level timeouts. This should be positive number
+   * errors and network-level timeouts. Exponential backoff with jitter is 
applied before retrying
+   * on 409 (Conflict) and 429 (Too Many Requests) responses. Other retryable 
errors
+   * (408/500/502/503/504 and network timeouts) are retried immediately. This 
should be positive
+   * number
    */
   public static final ConfigOption<Long> 
API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS =
       ConfigOption.<Long>builder()
@@ -392,8 +395,10 @@ public final class SparkOperatorConf {
                   + "application. This would be performed on top of k8s client 
"
                   + "spark.kubernetes.operator.retry.maxAttempts to overcome 
potential "
                   + "conflicting reconcile on the same SparkApplication, as 
well as API "
-                  + "server errors (408/500/502/503/504) and network-level 
timeouts. This should "
-                  + "be positive number.")
+                  + "server errors (408/500/502/503/504) and network-level 
timeouts. "
+                  + "Exponential backoff with jitter is applied before 
retrying on "
+                  + "409 (Conflict) and 429 (Too Many Requests) responses. 
Other retryable "
+                  + "errors are retried immediately. This should be positive 
number.")
           .typeParameterClass(Long.class)
           .defaultValue(3L)
           .build();
@@ -592,6 +597,62 @@ public final class SparkOperatorConf {
           .defaultValue(5)
           .build();
 
+  /**
+   * Initial backoff (in milliseconds) between retries when creating secondary 
resources. Backoff
+   * is applied only on 409 (Conflict) and 429 (Too Many Requests) responses.
+   */
+  public static final ConfigOption<Long> 
API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS =
+      ConfigOption.<Long>builder()
+          
.key("spark.kubernetes.operator.api.secondaryResourceCreateInitialBackoffMillis")
+          .enableDynamicOverride(false)
+          .description(
+              "Initial backoff (in milliseconds) between retries "
+                  + "when creating secondary resources for Spark application. "
+                  + "Backoff is applied only on 409 (Conflict) and "
+                  + "429 (Too Many Requests) responses.")
+          .typeParameterClass(Long.class)
+          .defaultValue(1000L)
+          .build();
+
+  /** Maximum backoff (in milliseconds) between retries when creating 
secondary resources. */
+  public static final ConfigOption<Long> 
API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS =
+      ConfigOption.<Long>builder()
+          
.key("spark.kubernetes.operator.api.secondaryResourceCreateMaxBackoffMillis")
+          .enableDynamicOverride(false)
+          .description(
+              "Maximum backoff (in milliseconds) between retries when creating 
"
+                  + "secondary resources for Spark application.")
+          .typeParameterClass(Long.class)
+          .defaultValue(40000L)
+          .build();
+
+  /** Backoff multiplier applied to each retry interval when creating 
secondary resources. */
+  public static final ConfigOption<Double> 
API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER =
+      ConfigOption.<Double>builder()
+          
.key("spark.kubernetes.operator.api.secondaryResourceCreateBackoffMultiplier")
+          .enableDynamicOverride(false)
+          .description(
+              "Backoff multiplier applied to each retry interval when creating 
"
+                  + "secondary resources for Spark application.")
+          .typeParameterClass(Double.class)
+          .defaultValue(2.0)
+          .build();
+
+  /**
+   * Jitter (in milliseconds) added to backoff delay between retries when 
creating secondary
+   * resources.
+   */
+  public static final ConfigOption<Long> 
API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS =
+      ConfigOption.<Long>builder()
+          
.key("spark.kubernetes.operator.api.secondaryResourceCreateBackoffJitterMillis")
+          .enableDynamicOverride(false)
+          .description(
+              "Jitter (in milliseconds) added to backoff delay between retries 
when "
+                  + "creating secondary resources for Spark application.")
+          .typeParameterClass(Long.class)
+          .defaultValue(500L)
+          .build();
+
   private SparkOperatorConf() {}
 
   /**
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/BackoffUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/BackoffUtils.java
new file mode 100644
index 0000000..553759b
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/BackoffUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static org.apache.spark.k8s.operator.config.SparkOperatorConf.*;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import lombok.extern.slf4j.Slf4j;
+
+/** Utility class for exponential backoff with jitter on retryable API errors. 
*/
+@Slf4j
+public final class BackoffUtils {
+
+  private BackoffUtils() {}
+
+  /**
+   * Sleeps with jitter exponential backoff before retrying resource creation.
+   *
+   * @see <a 
href="https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/";>
+   *     Exponential Backoff and Jitter</a>
+   */
+  public static void backoffSleep(int responseCode, long attemptCount, long 
maxAttempts) {
+    long initialBackoffMs = 
API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS.getValue();
+    long maxBackoffMs = 
API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS.getValue();
+    double backoffMultiplier = 
API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER.getValue();
+    long jitterMs = 
API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS.getValue();
+    long actualDelay =
+        computeBackoffDelay(
+            initialBackoffMs, backoffMultiplier, attemptCount, maxBackoffMs, 
jitterMs);
+    log.info(
+        "Retrying resource creation with exponential backoff. "
+            + "Delay: {}ms, responseCode: {}, attempt: {}/{}",
+        actualDelay,
+        responseCode,
+        attemptCount,
+        maxAttempts);
+    try {
+      Thread.sleep(actualDelay);
+    } catch (InterruptedException ex) {
+      log.info("Backoff sleep interrupted, waking up early to retry resource 
creation");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  static long computeBackoffDelay(
+      long initialBackoffMs,
+      double backoffMultiplier,
+      long attemptCount,
+      long maxBackoffMs,
+      long jitterMs) {
+    long exponentialDelay =
+        (long) (initialBackoffMs * Math.pow(backoffMultiplier, attemptCount - 
2));
+    long cappedDelay = Math.min(exponentialDelay, maxBackoffMs);
+    long jitter = jitterMs > 0 ? ThreadLocalRandom.current().nextLong(0, 
jitterMs + 1) : 0;
+    return cappedDelay + jitter;
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
index b042f4b..0896b70 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
@@ -48,6 +48,7 @@ import 
io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.spark.k8s.operator.BaseResource;
+import org.apache.spark.k8s.operator.Constants;
 import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
 
 /** Utility class for reconciler operations. */
@@ -132,6 +133,8 @@ public final class ReconcilerUtils {
                 return current;
               }
             }
+          } else if (e.getCode() == Constants.HTTP_TOO_MANY_REQUESTS) {
+            log.debug("Server returned 429 Too Many Requests, will retry with 
backoff");
           } else if (isTransientError(e) || e.getCode() == 
HTTP_INTERNAL_ERROR) {
             // GET to avoid duplicate create attempt for timeouts (0) and 
transient 5xx
             current = getResource(client, resource);
@@ -145,6 +148,9 @@ public final class ReconcilerUtils {
             log.error("Max Retries exceeded while trying to create resource");
             throw e;
           }
+          if (shouldBackoffBeforeRetry(e)) {
+            BackoffUtils.backoffSleep(e.getCode(), attemptCount, maxAttempts);
+          }
         }
       }
     }
@@ -222,6 +228,13 @@ public final class ReconcilerUtils {
     }
   }
 
+  private static boolean shouldBackoffBeforeRetry(KubernetesClientException e) 
{
+    return switch (e.getCode()) {
+      case HTTP_CONFLICT, Constants.HTTP_TOO_MANY_REQUESTS -> true;
+      default -> false;
+    };
+  }
+
   private static boolean isTransientError(KubernetesClientException e) {
     // code 0 is fabric8's sentinel for network-level failures (timeouts, 
connection resets, etc.)
     return switch (e.getCode()) {
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
index f9820a9..5164101 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
@@ -19,6 +19,13 @@
 
 package org.apache.spark.k8s.operator.reconciler;
 
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS;
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER;
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS;
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS;
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS;
+import static org.apache.spark.k8s.operator.utils.TestUtils.setConfigKey;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,13 +34,18 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.List;
 import java.util.Optional;
 
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.NamespaceableResource;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 import org.junit.jupiter.api.BeforeEach;
@@ -173,4 +185,77 @@ class SparkAppReconcilerTest {
       utils.verifyNoInteractions();
     }
   }
+
+    @SuppressWarnings("unchecked")
+    private NamespaceableResource<Pod> mockSecondaryResource(Pod pod) {
+        NamespaceableResource<Pod> mockResource = 
mock(NamespaceableResource.class);
+        when(mockClient.resource(pod)).thenReturn(mockResource);
+        return mockResource;
+    }
+
+    private void setFastBackoff() {
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS, 3L);
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS, 
5000L);
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS, 10000L);
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS, 0L);
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER, 2.0);
+    }
+
+    private void restoreBackoffDefaults(
+            long maxAttempts,
+            long initialBackoff,
+            long maxBackoff,
+            long jitter,
+            double multiplier) {
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS, maxAttempts);
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS, 
initialBackoff);
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS, 
maxBackoff);
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS, 
jitter);
+        setConfigKey(API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER, 
multiplier);
+    }
+
+    private Pod buildTestPod() {
+        return new PodBuilder()
+                .withNewMetadata()
+                .withName("test-pod")
+                .withNamespace("default")
+                .endMetadata()
+                .build();
+    }
+
+    @Test
+    void secondaryResourceCreationRetriesWithBackoffOnConflict() {
+        long savedMaxAttempts = 
API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS.getValue();
+        long savedInitialBackoff = 
API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS.getValue();
+        long savedMaxBackoff = 
API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS.getValue();
+        long savedJitter = 
API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS.getValue();
+        double savedMultiplier = 
API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER.getValue();
+        try {
+            setFastBackoff();
+            Pod pod = buildTestPod();
+            NamespaceableResource<Pod> mockResource = 
mockSecondaryResource(pod);
+            KubernetesClientException conflict =
+                    new KubernetesClientException("conflict", HTTP_CONFLICT, 
null);
+            when(mockResource.get()).thenReturn(null);
+            when(mockResource.create()).thenThrow(conflict).thenReturn(pod);
+
+            long start = System.currentTimeMillis();
+            Optional<Pod> result = 
ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod);
+            long elapsed = System.currentTimeMillis() - start;
+
+            assertTrue(result.isPresent());
+            verify(mockResource, times(2)).create();
+            assertTrue(elapsed >= 5000L,
+                    "Expected backoff delay of at least "
+                            + "5000ms, but was: "
+                            + elapsed + "ms");
+        } finally {
+            restoreBackoffDefaults(
+                    savedMaxAttempts,
+                    savedInitialBackoff,
+                    savedMaxBackoff,
+                    savedJitter,
+                    savedMultiplier);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to