This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 1570fc5 [SPARK-55750] Retry creating secondary resources on
`408/500/502/503/504` error codes and network timeouts
1570fc5 is described below
commit 1570fc527c84962cacf0f9e4e8b2988f5c87e75c
Author: Zhou JIANG <[email protected]>
AuthorDate: Mon Mar 9 09:54:28 2026 -0700
[SPARK-55750] Retry creating secondary resources on `408/500/502/503/504`
error codes and network timeouts
### What changes were proposed in this pull request?
This commit extends the existing retry logic in
`ReconcilerUtils::getOrCreateSecondaryResource` to also retry on transient API
server errors (408, 502, 503, 504), 500, and network-level timeouts, besides
409 conflict.
Also added unit test coverage for the retry logic.
### Why are the changes needed?
Transient API server overload and network-level timeouts are recoverable
conditions. Without retrying, a single dropped response causes the entire
reconcile loop to fail and would mark app as scheduling failed. This enhance
the scheduling logic robustness.
### Does this PR introduce _any_ user-facing change?
No behavioral change for healthy apps. On transient API server failures,
the operator now retries up to
`spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts` times
(default 3) rather than immediately propagating the error.
### How was this patch tested?
Added unit test to validate the retry behavior.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #530 from jiangzho/init_etry.
Authored-by: Zhou JIANG <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
docs/config_properties.md | 2 +-
.../k8s/operator/config/SparkOperatorConf.java | 8 +-
.../spark/k8s/operator/utils/ReconcilerUtils.java | 26 +++-
.../k8s/operator/utils/ReconcilerUtilsTest.java | 133 +++++++++++++++++++++
4 files changed, 162 insertions(+), 7 deletions(-)
diff --git a/docs/config_properties.md b/docs/config_properties.md
index adac1a4..2edeeda 100644
--- a/docs/config_properties.md
+++ b/docs/config_properties.md
@@ -4,7 +4,7 @@
| --- | --- | --- | --- | --- |
| spark.kubernetes.operator.api.retryAttemptAfterSeconds | Long | 1 | false |
Default time (in seconds) to wait till next request. This would be used if
server does not set Retry-After in response. Setting this to non-positive
number means immediate retry. |
| spark.kubernetes.operator.api.retryMaxAttempts | Integer | 15 | false | Max
attempts of retries on unhandled controller errors. Setting this to
non-positive value means no retry. |
- | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3
| false | Maximal number of retry attempts of requesting secondary resource for
Spark application. This would be performed on top of k8s client
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting
reconcile on the same SparkApplication. This should be positive number |
+ | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3
| false | Maximal number of retry attempts of requesting secondary resource for
Spark application. This would be performed on top of k8s client
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting
reconcile on the same SparkApplication, as well as API server errors
(408/500/502/503/504) and network-level timeouts. This should be positive
number. |
| spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false |
Maximal number of retry attempts of requests to k8s server for resource status
update. This would be performed on top of k8s client
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting
update on the same SparkApplication. This should be positive number. |
| spark.kubernetes.operator.dynamicConfig.enabled | Boolean | false | false |
When enabled, operator would use config map as source of truth for config
property override. The config map need to be created in
spark.kubernetes.operator.namespace, and labeled with operator name. |
| spark.kubernetes.operator.dynamicConfig.reconcilerParallelism | Integer | 1
| false | Parallelism for dynamic config reconciler. Unbounded pool would be
used if set to non-positive number. |
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
index 9a3b9df..9b8c54f 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -380,7 +380,8 @@ public final class SparkOperatorConf {
/**
* Maximal number of retry attempts of requesting secondary resource for
Spark application. This
* would be performed on top of k8s client
spark.kubernetes.operator.retry.maxAttempts to overcome
- * potential conflicting reconcile on the same SparkApplication. This should
be positive number
+ * potential conflicting reconcile on the same SparkApplication, as well as
transient API server
+ * errors and network-level timeouts. This should be positive number
*/
public static final ConfigOption<Long>
API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS =
ConfigOption.<Long>builder()
@@ -390,8 +391,9 @@ public final class SparkOperatorConf {
"Maximal number of retry attempts of requesting secondary
resource for Spark "
+ "application. This would be performed on top of k8s client
"
+ "spark.kubernetes.operator.retry.maxAttempts to overcome
potential "
- + "conflicting reconcile on the same SparkApplication. This
should be "
- + "positive number")
+ + "conflicting reconcile on the same SparkApplication, as
well as API "
+ + "server errors (408/500/502/503/504) and network-level
timeouts. This should "
+ + "be positive number.")
.typeParameterClass(Long.class)
.defaultValue(3L)
.build();
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
index 368313d..b042f4b 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
@@ -19,8 +19,13 @@
package org.apache.spark.k8s.operator.utils;
+import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
+import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
+import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS;
import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.RECONCILER_FOREGROUND_REQUEST_TIMEOUT_SECONDS;
import static
org.apache.spark.k8s.operator.utils.ModelUtils.buildOwnerReferenceTo;
@@ -127,13 +132,19 @@ public final class ReconcilerUtils {
return current;
}
}
- if (++attemptCount > maxAttempts) {
- log.error("Max Retries exceeded while trying to create
resource");
- throw e;
+ } else if (isTransientError(e) || e.getCode() ==
HTTP_INTERNAL_ERROR) {
+ // GET to avoid duplicate create attempt for timeouts (0) and
transient 5xx
+ current = getResource(client, resource);
+ if (current.isPresent()) {
+ return current;
}
} else {
throw e;
}
+ if (++attemptCount > maxAttempts) {
+ log.error("Max Retries exceeded while trying to create resource");
+ throw e;
+ }
}
}
}
@@ -211,6 +222,15 @@ public final class ReconcilerUtils {
}
}
+ private static boolean isTransientError(KubernetesClientException e) {
+ // code 0 is fabric8's sentinel for network-level failures (timeouts,
connection resets, etc.)
+ return switch (e.getCode()) {
+ case 0, HTTP_CLIENT_TIMEOUT, HTTP_BAD_GATEWAY,
+ HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT -> true;
+ default -> false;
+ };
+ }
+
/**
* Clones an object using JSON serialization and deserialization.
*
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/ReconcilerUtilsTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/ReconcilerUtilsTest.java
new file mode 100644
index 0000000..9847892
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/ReconcilerUtilsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Optional;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.NamespaceableResource;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class ReconcilerUtilsTest {
+
+ private Pod buildPod() {
+ return new PodBuilder()
+ .withNewMetadata()
+ .withName("test-pod")
+ .withNamespace("default")
+ .endMetadata()
+ .build();
+ }
+
+ @SuppressWarnings("unchecked")
+ private NamespaceableResource<Pod> mockClientReturning(
+ KubernetesClient mockClient, Pod pod) {
+ NamespaceableResource<Pod> mockResource =
mock(NamespaceableResource.class);
+ when(mockClient.resource(pod)).thenReturn(mockResource);
+ return mockResource;
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {500, 502, 503, 504})
+ void retriesCreateOnTransient5xxAndSucceeds(int errorCode) {
+ Pod pod = buildPod();
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient,
pod);
+ when(mockResource.get()).thenReturn(null);
+ // 1st CREATE -> fail; 2nd CREATE -> success
+ when(mockResource.create())
+ .thenThrow(new KubernetesClientException("Transient error", errorCode,
null))
+ // succeeds on 2nd attempt
+ .thenReturn(pod);
+
+ Optional<Pod> result =
ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod);
+ assertTrue(result.isPresent());
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {500, 502, 503, 504})
+ void returnsResourceFoundByGetAfterTransient5xx(int errorCode) {
+ Pod pod = buildPod();
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient,
pod);
+ // 1st GET -> not found; 2nd GET (after failed create) -> resource found
+ // mimic create landed on server but response was lost
+ when(mockResource.get()).thenReturn(null).thenReturn(pod);
+ when(mockResource.create())
+ .thenThrow(new KubernetesClientException("Transient error", errorCode,
null));
+
+ Optional<Pod> result =
ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod);
+ assertTrue(result.isPresent());
+ }
+
+ @Test
+ void retriesOnNetworkLevelTimeout() {
+ // Network-level timeout surfaces as KubernetesClientException with code 0
in fabric8
+ Pod pod = buildPod();
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient,
pod);
+ // 1st GET -> not found; and GET (after timeout) -> resource found
+ when(mockResource.get()).thenReturn(null).thenReturn(pod);
+ when(mockResource.create())
+ .thenThrow(new KubernetesClientException("Connection timeout", 0,
null));
+
+ Optional<Pod> result =
ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod);
+ assertTrue(result.isPresent());
+ }
+
+ @Test
+ void throwsAfterMaxAttemptsExceededOnTransientError() {
+ Pod pod = buildPod();
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient,
pod);
+ // resource never appears in any GET
+ when(mockResource.get()).thenReturn(null);
+ when(mockResource.create())
+ .thenThrow(new KubernetesClientException("Service unavailable", 503,
null));
+
+ assertThrows(
+ KubernetesClientException.class,
+ () -> ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod));
+ }
+
+ @Test
+ void doesNotRetryOnNonTransientError() {
+ Pod pod = buildPod();
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient,
pod);
+ when(mockResource.get()).thenReturn(null);
+ when(mockResource.create())
+ .thenThrow(new KubernetesClientException("Unprocessable entity", 422,
null));
+
+ assertThrows(
+ KubernetesClientException.class,
+ () -> ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]