This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 1570fc5  [SPARK-55750] Retry creating secondary resources on 
`408/500/502/503/504` error codes and network timeouts
1570fc5 is described below

commit 1570fc527c84962cacf0f9e4e8b2988f5c87e75c
Author: Zhou JIANG <[email protected]>
AuthorDate: Mon Mar 9 09:54:28 2026 -0700

    [SPARK-55750] Retry creating secondary resources on `408/500/502/503/504` 
error codes and network timeouts
    
    ### What changes were proposed in this pull request?
    
    This commit extends the existing retry logic in 
`ReconcilerUtils::getOrCreateSecondaryResource` to also retry on transient API 
server errors (408, 502, 503, 504), 500, and network-level timeouts, besides 
409 conflict.
    
    Also added unit test coverage for the retry logic.
    
    ### Why are the changes needed?
    
    Transient API server overload and network-level timeouts are recoverable 
conditions. Without retrying, a single dropped response causes the entire 
reconcile loop to fail and would mark app as scheduling failed. This enhance 
the scheduling logic robustness.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No behavioral change for healthy apps. On transient API server failures, 
the operator now retries up to 
`spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts` times 
(default 3) rather than immediately propagating the error.
    
    ### How was this patch tested?
    
    Added unit test to validate the retry behavior.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #530 from jiangzho/init_etry.
    
    Authored-by: Zhou JIANG <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 docs/config_properties.md                          |   2 +-
 .../k8s/operator/config/SparkOperatorConf.java     |   8 +-
 .../spark/k8s/operator/utils/ReconcilerUtils.java  |  26 +++-
 .../k8s/operator/utils/ReconcilerUtilsTest.java    | 133 +++++++++++++++++++++
 4 files changed, 162 insertions(+), 7 deletions(-)

diff --git a/docs/config_properties.md b/docs/config_properties.md
index adac1a4..2edeeda 100644
--- a/docs/config_properties.md
+++ b/docs/config_properties.md
@@ -4,7 +4,7 @@
  | --- | --- | --- | --- | --- | 
  | spark.kubernetes.operator.api.retryAttemptAfterSeconds | Long | 1 | false | 
Default time (in seconds) to wait till next request. This would be used if 
server does not set Retry-After in response. Setting this to non-positive 
number means immediate retry. | 
  | spark.kubernetes.operator.api.retryMaxAttempts | Integer | 15 | false | Max 
attempts of retries on unhandled controller errors. Setting this to 
non-positive value means no retry. | 
- | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3 
| false | Maximal number of retry attempts of requesting secondary resource for 
Spark application. This would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting 
reconcile on the same SparkApplication. This should be positive number | 
+ | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3 
| false | Maximal number of retry attempts of requesting secondary resource for 
Spark application. This would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting 
reconcile on the same SparkApplication, as well as API server errors 
(408/500/502/503/504) and network-level timeouts. This should be positive 
number. | 
  | spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false | 
Maximal number of retry attempts of requests to k8s server for resource status 
update. This would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting 
update on the same SparkApplication. This should be positive number. | 
  | spark.kubernetes.operator.dynamicConfig.enabled | Boolean | false | false | 
When enabled, operator would use config map as source of truth for config 
property override. The config map need to be created in 
spark.kubernetes.operator.namespace, and labeled with operator name. | 
  | spark.kubernetes.operator.dynamicConfig.reconcilerParallelism | Integer | 1 
| false | Parallelism for dynamic config reconciler. Unbounded pool would be 
used if set to non-positive number. | 
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
index 9a3b9df..9b8c54f 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -380,7 +380,8 @@ public final class SparkOperatorConf {
   /**
    * Maximal number of retry attempts of requesting secondary resource for 
Spark application. This
    * would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome
-   * potential conflicting reconcile on the same SparkApplication. This should 
be positive number
+   * potential conflicting reconcile on the same SparkApplication, as well as 
transient API server
+   * errors and network-level timeouts. This should be positive number
    */
   public static final ConfigOption<Long> 
API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS =
       ConfigOption.<Long>builder()
@@ -390,8 +391,9 @@ public final class SparkOperatorConf {
               "Maximal number of retry attempts of requesting secondary 
resource for Spark "
                   + "application. This would be performed on top of k8s client 
"
                   + "spark.kubernetes.operator.retry.maxAttempts to overcome 
potential "
-                  + "conflicting reconcile on the same SparkApplication. This 
should be "
-                  + "positive number")
+                  + "conflicting reconcile on the same SparkApplication, as 
well as API "
+                  + "server errors (408/500/502/503/504) and network-level 
timeouts. This should "
+                  + "be positive number.")
           .typeParameterClass(Long.class)
           .defaultValue(3L)
           .build();
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
index 368313d..b042f4b 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java
@@ -19,8 +19,13 @@
 
 package org.apache.spark.k8s.operator.utils;
 
+import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
+import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
+import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
 import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.API_SECONDARY_RESOURCE_CREATE_MAX_ATTEMPTS;
 import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.RECONCILER_FOREGROUND_REQUEST_TIMEOUT_SECONDS;
 import static 
org.apache.spark.k8s.operator.utils.ModelUtils.buildOwnerReferenceTo;
@@ -127,13 +132,19 @@ public final class ReconcilerUtils {
                 return current;
               }
             }
-            if (++attemptCount > maxAttempts) {
-              log.error("Max Retries exceeded while trying to create 
resource");
-              throw e;
+          } else if (isTransientError(e) || e.getCode() == 
HTTP_INTERNAL_ERROR) {
+            // GET to avoid duplicate create attempt for timeouts (0) and 
transient 5xx
+            current = getResource(client, resource);
+            if (current.isPresent()) {
+              return current;
             }
           } else {
             throw e;
           }
+          if (++attemptCount > maxAttempts) {
+            log.error("Max Retries exceeded while trying to create resource");
+            throw e;
+          }
         }
       }
     }
@@ -211,6 +222,15 @@ public final class ReconcilerUtils {
     }
   }
 
+  private static boolean isTransientError(KubernetesClientException e) {
+    // code 0 is fabric8's sentinel for network-level failures (timeouts, 
connection resets, etc.)
+    return switch (e.getCode()) {
+      case 0, HTTP_CLIENT_TIMEOUT, HTTP_BAD_GATEWAY,
+           HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT -> true;
+      default -> false;
+    };
+  }
+
   /**
    * Clones an object using JSON serialization and deserialization.
    *
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/ReconcilerUtilsTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/ReconcilerUtilsTest.java
new file mode 100644
index 0000000..9847892
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/ReconcilerUtilsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.utils;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Optional;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.dsl.NamespaceableResource;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class ReconcilerUtilsTest {
+
+  private Pod buildPod() {
+    return new PodBuilder()
+        .withNewMetadata()
+        .withName("test-pod")
+        .withNamespace("default")
+        .endMetadata()
+        .build();
+  }
+
+  @SuppressWarnings("unchecked")
+  private NamespaceableResource<Pod> mockClientReturning(
+      KubernetesClient mockClient, Pod pod) {
+    NamespaceableResource<Pod> mockResource = 
mock(NamespaceableResource.class);
+    when(mockClient.resource(pod)).thenReturn(mockResource);
+    return mockResource;
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {500, 502, 503, 504})
+  void retriesCreateOnTransient5xxAndSucceeds(int errorCode) {
+    Pod pod = buildPod();
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient, 
pod);
+    when(mockResource.get()).thenReturn(null);
+    // 1st CREATE -> fail; 2nd CREATE -> success
+    when(mockResource.create())
+        .thenThrow(new KubernetesClientException("Transient error", errorCode, 
null))
+        // succeeds on 2nd attempt
+        .thenReturn(pod);
+
+    Optional<Pod> result = 
ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod);
+    assertTrue(result.isPresent());
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {500, 502, 503, 504})
+  void returnsResourceFoundByGetAfterTransient5xx(int errorCode) {
+    Pod pod = buildPod();
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient, 
pod);
+    // 1st GET -> not found; 2nd GET (after failed create) -> resource found
+    // mimic create landed on server but response was lost
+    when(mockResource.get()).thenReturn(null).thenReturn(pod);
+    when(mockResource.create())
+        .thenThrow(new KubernetesClientException("Transient error", errorCode, 
null));
+
+    Optional<Pod> result = 
ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod);
+    assertTrue(result.isPresent());
+  }
+
+  @Test
+  void retriesOnNetworkLevelTimeout() {
+    // Network-level timeout surfaces as KubernetesClientException with code 0 
in fabric8
+    Pod pod = buildPod();
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient, 
pod);
+    // 1st GET -> not found; and GET (after timeout) -> resource found
+    when(mockResource.get()).thenReturn(null).thenReturn(pod);
+    when(mockResource.create())
+        .thenThrow(new KubernetesClientException("Connection timeout", 0, 
null));
+
+    Optional<Pod> result = 
ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod);
+    assertTrue(result.isPresent());
+  }
+
+  @Test
+  void throwsAfterMaxAttemptsExceededOnTransientError() {
+    Pod pod = buildPod();
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient, 
pod);
+    // resource never appears in any GET
+    when(mockResource.get()).thenReturn(null);
+    when(mockResource.create())
+        .thenThrow(new KubernetesClientException("Service unavailable", 503, 
null));
+
+    assertThrows(
+        KubernetesClientException.class,
+        () -> ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod));
+  }
+
+  @Test
+  void doesNotRetryOnNonTransientError() {
+    Pod pod = buildPod();
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    NamespaceableResource<Pod> mockResource = mockClientReturning(mockClient, 
pod);
+    when(mockResource.get()).thenReturn(null);
+    when(mockResource.create())
+        .thenThrow(new KubernetesClientException("Unprocessable entity", 422, 
null));
+
+    assertThrows(
+        KubernetesClientException.class,
+        () -> ReconcilerUtils.getOrCreateSecondaryResource(mockClient, pod));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to