This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c099104 [SPARK-55905] Exponential backoff for
getOrCreateSecondaryResource attempt
c099104 is described below
commit c0991045e5fcb1cc335c1652af2dbf38f5b36a7d
Author: Qi Tan <[email protected]>
AuthorDate: Thu Mar 12 13:10:43 2026 -0700
[SPARK-55905] Exponential backoff for getOrCreateSecondaryResource attempt
### What changes were proposed in this pull request?
[SPARK-55905] Exponential backoff for getOrCreateSecondaryResource attempt
### Why are the changes needed?
Optimize Retry
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
unit test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #542 from TQJADE/SPARK-55905.
Lead-authored-by: Qi Tan <[email protected]>
Co-authored-by: Qi Tan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
docs/config_properties.md | 6 +-
.../org/apache/spark/k8s/operator/Constants.java | 3 +
.../k8s/operator/config/SparkOperatorConf.java | 67 ++++++++++++++++-
.../spark/k8s/operator/utils/BackoffUtils.java | 75 +++++++++++++++++++
.../spark/k8s/operator/utils/ReconcilerUtils.java | 13 ++++
.../reconciler/SparkAppReconcilerTest.java | 85 ++++++++++++++++++++++
6 files changed, 245 insertions(+), 4 deletions(-)
diff --git a/docs/config_properties.md b/docs/config_properties.md
index 2edeeda..f7040a2 100644
--- a/docs/config_properties.md
+++ b/docs/config_properties.md
@@ -4,7 +4,11 @@
| --- | --- | --- | --- | --- |
| spark.kubernetes.operator.api.retryAttemptAfterSeconds | Long | 1 | false |
Default time (in seconds) to wait till next request. This would be used if
server does not set Retry-After in response. Setting this to non-positive
number means immediate retry. |
| spark.kubernetes.operator.api.retryMaxAttempts | Integer | 15 | false | Max
attempts of retries on unhandled controller errors. Setting this to
non-positive value means no retry. |
- | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3
| false | Maximal number of retry attempts of requesting secondary resource for
Spark application. This would be performed on top of k8s client
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting
reconcile on the same SparkApplication, as well as API server errors
(408/500/502/503/504) and network-level timeouts. This should be positive
number. |
+ | spark.kubernetes.operator.api.secondaryResourceCreateBackoffJitterMillis |
Long | 500 | false | Jitter (in milliseconds) added to backoff delay between
retries when creating secondary resources for Spark application. |
+ | spark.kubernetes.operator.api.secondaryResourceCreateBackoffMultiplier |
Double | 2.0 | false | Backoff multiplier applied to each retry interval when
creating secondary resources for Spark application. |
+ | spark.kubernetes.operator.api.secondaryResourceCreateInitialBackoffMillis |
Long | 1000 | false | Initial backoff (in milliseconds) between retries when
creating secondary resources for Spark application. Backoff is applied only on
409 (Conflict) and 429 (Too Many Requests) responses. |
+ | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3
| false | Maximal number of retry attempts of requesting secondary resource for
Spark application. This would be performed on top of k8s client
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting
reconcile on the same SparkApplication, as well as API server errors
(408/500/502/503/504) and network-level timeouts. Exponential backoff with
jitter is applied before retrying on 409 (Confl [...]
+ | spark.kubernetes.operator.api.secondaryResourceCreateMaxBackoffMillis |
Long | 40000 | false | Maximum backoff (in milliseconds) between retries when
creating secondary resources for Spark application. |
| spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false |
Maximal number of retry attempts of requests to k8s server for resource status
update. This would be performed on top of k8s client
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting
update on the same SparkApplication. This should be positive number. |
| spark.kubernetes.operator.dynamicConfig.enabled | Boolean | false | false |
When enabled, operator would use config map as source of truth for config
property override. The config map need to be created in
spark.kubernetes.operator.namespace, and labeled with operator name. |
| spark.kubernetes.operator.dynamicConfig.reconcilerParallelism | Integer | 1
| false | Parallelism for dynamic config reconciler. Unbounded pool would be
used if set to non-positive number. |
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
index 898286e..1934946 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
@@ -225,4 +225,7 @@ public class Constants {
/** Message indicating that the cluster status cannot be processed. */
public static final String UNKNOWN_CLUSTER_STATE_MESSAGE = "Cannot process
cluster status.";
+
+ /** HTTP 429 Too Many Requests (not defined in {@link
java.net.HttpURLConnection}). */
+ public static final int HTTP_TOO_MANY_REQUESTS = 429;
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
index 9b8c54f..51b2710 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -381,7 +381,10 @@ public final class SparkOperatorConf {
* Maximal number of retry attempts of requesting secondary resource for
Spark application. This
* would be performed on top of k8s client
spark.kubernetes.operator.retry.maxAttempts to overcome
* potential conflicting reconcile on the same SparkApplication, as well as
transient API server
- * errors and network-level timeouts. This should be positive number
+ * errors and network-level timeouts. Exponential backoff with jitter is
applied before retrying
+ * on 409 (Conflict) and 429 (Too Many Requests) responses. Other retryable
errors
+ * (408/500/502/503/504 and network timeouts) are retried immediately. This
should be positive
+ * number
*/
public static final ConfigOption<Long>
API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS =
ConfigOption.<Long>builder()
@@ -392,8 +395,10 @@ public final class SparkOperatorConf {
+ "application. This would be performed on top of k8s client
"
+ "spark.kubernetes.operator.retry.maxAttempts to overcome
potential "
+ "conflicting reconcile on the same SparkApplication, as
well as API "
- + "server errors (408/500/502/503/504) and network-level
timeouts. This should "
- + "be positive number.")
+ + "server errors (408/500/502/503/504) and network-level
timeouts. "
+ + "Exponential backoff with jitter is applied before
retrying on "
+ + "409 (Conflict) and 429 (Too Many Requests) responses.
Other retryable "
+ + "errors are retried immediately. This should be positive
number.")
.typeParameterClass(Long.class)
.defaultValue(3L)
.build();
@@ -592,6 +597,62 @@ public final class SparkOperatorConf {
.defaultValue(5)
.build();
+ /**
+ * Initial backoff (in milliseconds) between retries when creating secondary
resources. Backoff
+ * is applied only on 409 (Conflict) and 429 (Too Many Requests) responses.
+ */
+ public static final ConfigOption<Long>
API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS =
+ ConfigOption.<Long>builder()
+
.key("spark.kubernetes.operator.api.secondaryResourceCreateInitialBackoffMillis")
+ .enableDynamicOverride(false)
+ .description(
+ "Initial backoff (in milliseconds) between retries "
+ + "when creating secondary resources for Spark application. "
+ + "Backoff is applied only on 409 (Conflict) and "
+ + "429 (Too Many Requests) responses.")
+ .typeParameterClass(Long.class)
+ .defaultValue(1000L)
+ .build();
+
+ /** Maximum backoff (in milliseconds) between retries when creating
secondary resources. */
+ public static final ConfigOption<Long>
API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS =
+ ConfigOption.<Long>builder()
+
.key("spark.kubernetes.operator.api.secondaryResourceCreateMaxBackoffMillis")
+ .enableDynamicOverride(false)
+ .description(
+ "Maximum backoff (in milliseconds) between retries when creating
"
+ + "secondary resources for Spark application.")
+ .typeParameterClass(Long.class)
+ .defaultValue(40000L)
+ .build();
+
+ /** Backoff multiplier applied to each retry interval when creating
secondary resources. */
+ public static final ConfigOption<Double>
API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER =
+ ConfigOption.<Double>builder()
+
.key("spark.kubernetes.operator.api.secondaryResourceCreateBackoffMultiplier")
+ .enableDynamicOverride(false)
+ .description(
+ "Backoff multiplier applied to each retry interval when creating
"
+ + "secondary resources for Spark application.")
+ .typeParameterClass(Double.class)
+ .defaultValue(2.0)
+ .build();
+
+ /**
+ * Jitter (in milliseconds) added to backoff delay between retries when
creating secondary
+ * resources.
+ */
+ public static final ConfigOption<Long>
API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS =
+ ConfigOption.<Long>builder()
+
.key("spark.kubernetes.operator.api.secondaryResourceCreateBackoffJitterMillis")
+ .enableDynamicOverride(false)
+ .description(
+ "Jitter (in milliseconds) added to backoff delay between retries
when "
+ + "creating secondary resources for Spark application.")
+ .typeParameterClass(Long.class)
+ .defaultValue(500L)
+ .build();
+
private SparkOperatorConf() {}
/**
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/BackoffUtils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/BackoffUtils.java
new file mode 100644
index 0000000..553759b
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/BackoffUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static org.apache.spark.k8s.operator.config.SparkOperatorConf.*;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import lombok.extern.slf4j.Slf4j;
+
+/** Utility class for exponential backoff with jitter on retryable API errors.
*/
+@Slf4j
+public final class BackoffUtils {
+
+ private BackoffUtils() {}
+
+ /**
+ * Sleeps with jitter exponential backoff before retrying resource creation.
+ *
+ * @see <a
href="https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/">
+ * Exponential Backoff and Jitter</a>
+ */
+ public static void backoffSleep(int responseCode, long attemptCount, long
maxAttempts) {
+ long initialBackoffMs =
API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS.getValue();
+ long maxBackoffMs =
API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS.getValue();
+ double backoffMultiplier =
API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER.getValue();
+ long jitterMs =
API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS.getValue();
+ long actualDelay =
+ computeBackoffDelay(
+ initialBackoffMs, backoffMultiplier, attemptCount, maxBackoffMs,
jitterMs);
+ log.info(
+ "Retrying resource creation with exponential backoff. "
+ + "Delay: {}ms, responseCode: {}, attempt: {}/{}",
+ actualDelay,
+ responseCode,
+ attemptCount,
+ maxAttempts);
+ try {
+ Thread.sleep(actualDelay);
+ } catch (InterruptedException ex) {
+ log.info("Backoff sleep interrupted, waking up early to retry resource
creation");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ static long computeBackoffDelay(
+ long initialBackoffMs,
+ double backoffMultiplier,
+ long attemptCount,
+ long maxBackoffMs,
+ long jitterMs) {
+ long exponentialDelay =
+ (long) (initialBackoffMs * Math.pow(backoffMultiplier, attemptCount -
2));
+ long cappedDelay = Math.min(exponentialDelay, maxBackoffMs);
+ long jitter = jitterMs > 0 ? ThreadLocalRandom.current().nextLong(0,
jitterMs + 1) : 0;
+ return cappedDelay + jitter;
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
index b042f4b..0896b70 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
@@ -48,6 +48,7 @@ import
io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.k8s.operator.BaseResource;
+import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
/** Utility class for reconciler operations. */
@@ -132,6 +133,8 @@ public final class ReconcilerUtils {
return current;
}
}
+ } else if (e.getCode() == Constants.HTTP_TOO_MANY_REQUESTS) {
+ log.debug("Server returned 429 Too Many Requests, will retry with
backoff");
} else if (isTransientError(e) || e.getCode() ==
HTTP_INTERNAL_ERROR) {
// GET to avoid duplicate create attempt for timeouts (0) and
transient 5xx
current = getResource(client, resource);
@@ -145,6 +148,9 @@ public final class ReconcilerUtils {
log.error("Max Retries exceeded while trying to create resource");
throw e;
}
+ if (shouldBackoffBeforeRetry(e)) {
+ BackoffUtils.backoffSleep(e.getCode(), attemptCount, maxAttempts);
+ }
}
}
}
@@ -222,6 +228,13 @@ public final class ReconcilerUtils {
}
}
+ private static boolean shouldBackoffBeforeRetry(KubernetesClientException e)
{
+ return switch (e.getCode()) {
+ case HTTP_CONFLICT, Constants.HTTP_TOO_MANY_REQUESTS -> true;
+ default -> false;
+ };
+ }
+
private static boolean isTransientError(KubernetesClientException e) {
// code 0 is fabric8's sentinel for network-level failures (timeouts,
connection resets, etc.)
return switch (e.getCode()) {
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
index f9820a9..5164101 100644
---
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
@@ -19,6 +19,13 @@
package org.apache.spark.k8s.operator.reconciler;
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS;
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER;
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS;
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS;
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS;
+import static org.apache.spark.k8s.operator.utils.TestUtils.setConfigKey;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,13 +34,18 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Optional;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.NamespaceableResource;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import org.junit.jupiter.api.BeforeEach;
@@ -173,4 +185,77 @@ class SparkAppReconcilerTest {
utils.verifyNoInteractions();
}
}
+
+ @SuppressWarnings("unchecked")
+ private NamespaceableResource<Pod> mockSecondaryResource(Pod pod) {
+ NamespaceableResource<Pod> mockResource =
mock(NamespaceableResource.class);
+ when(mockClient.resource(pod)).thenReturn(mockResource);
+ return mockResource;
+ }
+
+ private void setFastBackoff() {
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS, 3L);
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS,
5000L);
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS, 10000L);
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS, 0L);
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER, 2.0);
+ }
+
+ private void restoreBackoffDefaults(
+ long maxAttempts,
+ long initialBackoff,
+ long maxBackoff,
+ long jitter,
+ double multiplier) {
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS, maxAttempts);
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS,
initialBackoff);
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS,
maxBackoff);
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS,
jitter);
+ setConfigKey(API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER,
multiplier);
+ }
+
+ private Pod buildTestPod() {
+ return new PodBuilder()
+ .withNewMetadata()
+ .withName("test-pod")
+ .withNamespace("default")
+ .endMetadata()
+ .build();
+ }
+
+ @Test
+ void secondaryResourceCreationRetriesWithBackoffOnConflict() {
+ long savedMaxAttempts =
API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS.getValue();
+ long savedInitialBackoff =
API_SECONDARY_RESOURCE_CREATE_INITIAL_BACKOFF_MILLIS.getValue();
+ long savedMaxBackoff =
API_SECONDARY_RESOURCE_CREATE_MAX_BACKOFF_MILLIS.getValue();
+ long savedJitter =
API_SECONDARY_RESOURCE_CREATE_BACKOFF_JITTER_MILLIS.getValue();
+ double savedMultiplier =
API_SECONDARY_RESOURCE_CREATE_BACKOFF_MULTIPLIER.getValue();
+ try {
+ setFastBackoff();
+ Pod pod = buildTestPod();
+ NamespaceableResource<Pod> mockResource =
mockSecondaryResource(pod);
+ KubernetesClientException conflict =
+ new KubernetesClientException("conflict", HTTP_CONFLICT,
null);
+ when(mockResource.get()).thenReturn(null);
+ when(mockResource.create()).thenThrow(conflict).thenReturn(pod);
+
+ long start = System.currentTimeMillis();
+ Optional<Pod> result =
ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod);
+ long elapsed = System.currentTimeMillis() - start;
+
+ assertTrue(result.isPresent());
+ verify(mockResource, times(2)).create();
+ assertTrue(elapsed >= 5000L,
+ "Expected backoff delay of at least "
+ + "5000ms, but was: "
+ + elapsed + "ms");
+ } finally {
+ restoreBackoffDefaults(
+ savedMaxAttempts,
+ savedInitialBackoff,
+ savedMaxBackoff,
+ savedJitter,
+ savedMultiplier);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]