This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new acc216f67ef Start building out more robust fabric8 retries (#18406)
acc216f67ef is described below

commit acc216f67ef0411eeee6343e96e13c0797c5cc73
Author: Lucas Capistrant <[email protected]>
AuthorDate: Mon Aug 25 17:05:10 2025 -0500

    Start building out more robust fabric8 retries (#18406)
    
    * Start building out more robust fabric8 retries
    
    * refactor based on review
---
 .../k8s/overlord/common/KubernetesPeonClient.java  | 178 ++++++++++++++++++---
 .../overlord/common/KubernetesPeonClientTest.java  | 138 +++++++++++++++-
 2 files changed, 295 insertions(+), 21 deletions(-)

diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 70f76076a9a..406258f8147 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -29,6 +29,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
+import io.vertx.core.http.HttpClosedException;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
@@ -88,29 +89,20 @@ public class KubernetesPeonClient
       String jobName = job.getMetadata().getName();
 
       log.info("Submitting job[%s] for task[%s].", jobName, task.getId());
-      client.batch()
-            .v1()
-            .jobs()
-            .inNamespace(namespace)
-            .resource(job)
-            .create();
+      createK8sJobWithRetries(job);
       log.info("Submitted job[%s] for task[%s]. Waiting for POD to launch.", 
jobName, task.getId());
 
-      // wait until the pod is running or complete or failed, any of those is 
fine
+      // Wait for the pod to be available
       Pod mainPod = getPeonPodWithRetries(jobName);
-      Pod result = client.pods()
-                         .inNamespace(namespace)
-                         .withName(mainPod.getMetadata().getName())
-                         .waitUntilCondition(pod -> {
-                           if (pod == null) {
-                             return true;
-                           }
-                           return pod.getStatus() != null && 
pod.getStatus().getPodIP() != null;
-                         }, howLong, timeUnit);
-      
+      log.info("Pod for job[%s] launched for task[%s]. Waiting for pod to be 
in running state.", jobName, task.getId());
+
+      // Wait for the pod to be in state running, completed, or failed.
+      Pod result = waitForPodResultWithRetries(mainPod, howLong, timeUnit);
+
       if (result == null) {
         throw new ISE("K8s pod for the task [%s] appeared and disappeared. It 
can happen if the task was canceled", task.getId());
       }
+      log.info("Pod for job[%s] is in state [%s] for task[%s].", jobName, 
result.getStatus().getPhase(), task.getId());
       long duration = System.currentTimeMillis() - start;
       emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
       return result;
@@ -290,11 +282,136 @@ public class KubernetesPeonClient
     return pods.isEmpty() ? Optional.absent() : Optional.of(pods.get(0));
   }
 
+  public Pod waitForPodResultWithRetries(final Pod pod, long howLong, TimeUnit 
timeUnit)
+  {
+    return clientApi.executeRequest(client -> 
waitForPodResultWithRetries(client, pod, howLong, timeUnit, 5, 
RetryUtils.DEFAULT_MAX_TRIES));
+  }
+
   public Pod getPeonPodWithRetries(String jobName)
   {
     return clientApi.executeRequest(client -> getPeonPodWithRetries(client, 
jobName, 5, RetryUtils.DEFAULT_MAX_TRIES));
   }
 
+  public void createK8sJobWithRetries(Job job)
+  {
+    clientApi.executeRequest(client -> {
+      createK8sJobWithRetries(client, job, 5, RetryUtils.DEFAULT_MAX_TRIES);
+      return null;
+    });
+  }
+
+  /**
+   * Creates a Kubernetes job with retry logic for transient connection pool 
exceptions.
+   * <p>
+   * This method attempts to create the specified job in Kubernetes with 
built-in retry logic
+   * for transient connection pool issues. If the job already exists (HTTP 409 
conflict),
+   * the method returns successfully without throwing an exception, assuming 
the job was
+   * already submitted by a previous request.
+   * <p>
+   * The retry logic only applies to transient connection pool exceptions. 
Other exceptions will cause the method to
+   * fail immediately.
+   *
+   * @param client the Kubernetes client to use for job creation
+   * @param job the Kubernetes job to create
+   * @param quietTries number of initial retry attempts without logging 
warnings
+   * @param maxTries maximum total number of retry attempts
+   * @throws DruidException if job creation fails after all retry attempts or 
encounters non-retryable errors
+   */
+  @VisibleForTesting
+  void createK8sJobWithRetries(KubernetesClient client, Job job, int 
quietTries, int maxTries)
+  {
+    try {
+      RetryUtils.retry(
+          () -> {
+            try {
+              client.batch()
+                    .v1()
+                    .jobs()
+                    .inNamespace(namespace)
+                    .resource(job)
+                    .create();
+              return null;
+            }
+            catch (KubernetesClientException e) {
+              if (e.getCode() == 409) {
+                // Job already exists, return successfully
+                log.info("K8s job[%s] already exists, skipping creation", 
job.getMetadata().getName());
+                return null;
+              }
+              throw e;
+            }
+          },
+          this::isRetryableTransientConnectionPoolException, quietTries, 
maxTries
+      );
+    }
+    catch (Exception e) {
+      throw DruidException.defensive(e, "Error when creating K8s job[%s]", 
job.getMetadata().getName());
+    }
+  }
+
+  /**
+   * Waits for a Kubernetes pod to reach a ready state with retry logic for 
transient connection pool exceptions.
+   * <p>
+   * This method waits for the specified pod to have a valid status with a pod 
IP assigned, indicating
+   * it has been scheduled and is in a ready state. The method includes retry 
logic to handle transient
+   * connection pool exceptions that may occur during the wait operation.
+   * <p>
+   * The method will wait up to the specified timeout for the pod to become 
ready, and retry the entire wait operation
+   * if transient connection issues are encountered.
+   *
+   * @param client the Kubernetes client to use for pod operations
+   * @param pod the pod to wait for
+   * @param howLong the maximum time to wait for the pod to become ready
+   * @param timeUnit the time unit for the wait timeout
+   * @param quietTries number of initial retry attempts without logging 
warnings
+   * @param maxTries maximum total number of retry attempts
+   * @return the pod in its ready state, or null if the pod disappeared or 
wait operation failed
+   * @throws DruidException if waiting fails after all retry attempts or 
encounters non-retryable errors
+   */
+  @VisibleForTesting
+  Pod waitForPodResultWithRetries(KubernetesClient client, Pod pod, long 
howLong, TimeUnit timeUnit, int quietTries, int maxTries)
+  {
+    try {
+      return RetryUtils.retry(
+          () -> client.pods()
+                    .inNamespace(namespace)
+                    .withName(pod.getMetadata().getName())
+                    .waitUntilCondition(
+                        p -> {
+                          if (p == null) {
+                            return true;
+                          }
+                          return p.getStatus() != null && 
p.getStatus().getPodIP() != null;
+                        }, howLong, timeUnit),
+          this::isRetryableTransientConnectionPoolException, quietTries, 
maxTries);
+    }
+    catch (Exception e) {
+      throw DruidException.defensive(e, "Error when waiting for pod[%s] to 
start", pod.getMetadata().getName());
+    }
+  }
+
+  /**
+   * Retrieves the pod associated with a Kubernetes job with retry logic for 
transient failures.
+   * <p>
+   * This method searches for a pod with the specified job name label and 
includes retry logic
+   * to handle both transient connection pool exceptions and cases where the 
pod may not be
+   * immediately available after job creation. If no pod is found, the method 
examines job
+   * events to provide detailed error information about pod creation failures.
+   * <p>
+   * The retry logic applies to:
+   * <ul>
+   *   <li>Transient connection pool exceptions</li>
+   *   <li>Pod not found scenarios, except when blacklisted error messages 
from {@link DruidK8sConstants#BLACKLISTED_PEON_POD_ERROR_MESSAGES} are 
encountered</li>
+   * </ul>
+   *
+   * @param client the Kubernetes client to use for pod and event operations
+   * @param jobName the name of the job whose pod should be retrieved
+   * @param quietTries number of initial retry attempts without logging 
warnings
+   * @param maxTries maximum total number of retry attempts
+   * @return the pod associated with the job
+   * @throws KubernetesResourceNotFoundException if the pod cannot be found 
after all retry attempts
+   * @throws DruidException if retrieval fails due to other errors
+   */
   @VisibleForTesting
   Pod getPeonPodWithRetries(KubernetesClient client, String jobName, int 
quietTries, int maxTries)
   {
@@ -317,7 +434,7 @@ public class KubernetesPeonClient
                   "Job[%s] failed to create pods. Message[%s]", jobName, 
latestEvent.getMessage());
             }
           },
-          this::shouldRetryStartingPeonPod, quietTries, maxTries
+          this::shouldRetryWaitForStartingPeonPod, quietTries, maxTries
       );
     }
     catch (KubernetesResourceNotFoundException e) {
@@ -337,8 +454,12 @@ public class KubernetesPeonClient
    * These substrings, found in {@link 
DruidK8sConstants#BLACKLISTED_PEON_POD_ERROR_MESSAGES},
    * represent Kubernetes event that indicate a retry for starting the Peon 
Pod would likely be futile.
    */
-  private boolean shouldRetryStartingPeonPod(Throwable e)
+  private boolean shouldRetryWaitForStartingPeonPod(Throwable e)
   {
+    if (isRetryableTransientConnectionPoolException(e)) {
+      return true;
+    }
+
     if (!(e instanceof KubernetesResourceNotFoundException)) {
       return false;
     }
@@ -353,6 +474,25 @@ public class KubernetesPeonClient
     return true;
   }
 
+  /**
+   * Checks if the exception is a potentially transient connection pool 
exception.
+   * <p>
+   * This method checks if the exception is one of the known transient 
connection pool exceptions
+   * and whether it contains a specific message substring, if applicable.
+   * <p>
+   * We have experienced connections in the pool being closed by the 
server-side but remaining in the pool. These issues
+   * should be safe to retry in many cases.
+   */
+  private boolean isRetryableTransientConnectionPoolException(Throwable e)
+  {
+    if (e instanceof KubernetesClientException) {
+      return e.getMessage() != null && e.getMessage().contains("Connection was 
closed");
+    } else if (e instanceof HttpClosedException) {
+      return true;
+    }
+    return false;
+  }
+
   private List<Event> getPeonEvents(KubernetesClient client, String jobName)
   {
     ObjectReference objectReference = new ObjectReferenceBuilder()
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index 9c831217914..9f5670c333b 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -27,7 +27,6 @@ import io.fabric8.kubernetes.api.model.PodListBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -145,7 +144,7 @@ public class KubernetesPeonClientTest
     client.pods().inNamespace(NAMESPACE).resource(pod).create();
 
     Assertions.assertThrows(
-        KubernetesClientTimeoutException.class,
+        DruidException.class,
         () -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, 
TimeUnit.SECONDS)
     );
   }
@@ -713,4 +712,139 @@ public class KubernetesPeonClientTest
     Optional<LogWatch> maybeLogWatch = instance.getPeonLogWatcher(new 
K8sTaskId(TASK_NAME_PREFIX, ID));
     Assertions.assertFalse(maybeLogWatch.isPresent());
   }
+
+  @Test
+  void test_createK8sJobWithRetries_withSuccessfulCreation_createsJob()
+  {
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(KUBERNETES_JOB_NAME)
+        .endMetadata()
+        .build();
+
+    // Should not throw any exception
+    instance.createK8sJobWithRetries(job);
+
+    // Verify job was created
+    Job createdJob = 
client.batch().v1().jobs().inNamespace(NAMESPACE).withName(KUBERNETES_JOB_NAME).get();
+    Assertions.assertNotNull(createdJob);
+    Assertions.assertEquals(KUBERNETES_JOB_NAME, 
createdJob.getMetadata().getName());
+  }
+
+  @Test
+  void 
test_createK8sJobWithRetries_withNonRetryableException_failsImmediately()
+  {
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(KUBERNETES_JOB_NAME)
+        .endMetadata()
+        .build();
+
+    String jobPath = "/apis/batch/v1/namespaces/" + NAMESPACE + "/jobs";
+
+    // Return 403 Forbidden - this is not a retryable exception
+    server.expect().post()
+        .withPath(jobPath)
+        .andReturn(HttpURLConnection.HTTP_FORBIDDEN, "Forbidden: insufficient 
permissions")
+        .once();
+
+    // Should fail immediately without retries
+    DruidException e = Assertions.assertThrows(
+        DruidException.class,
+        () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0, 
5)
+    );
+
+    // Verify the error message contains our job name
+    Assertions.assertTrue(e.getMessage().contains(KUBERNETES_JOB_NAME));
+  }
+
+  @Test
+  void test_createK8sJobWithRetries_withJobAlreadyExists_succeedsGracefully()
+  {
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(KUBERNETES_JOB_NAME)
+        .endMetadata()
+        .build();
+
+    String jobPath = "/apis/batch/v1/namespaces/" + NAMESPACE + "/jobs";
+
+    // Return 409 Conflict - job already exists
+    server.expect().post()
+        .withPath(jobPath)
+        .andReturn(HttpURLConnection.HTTP_CONFLICT, "Job already exists")
+        .once();
+
+    // Should succeed gracefully without throwing exception
+    Assertions.assertDoesNotThrow(
+        () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0, 
5)
+    );
+  }
+
+  @Test
+  void test_waitForPodResultWithRetries_withSuccessfulPodReady_returnsPod()
+  {
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .endMetadata()
+        .withNewStatus()
+        .withPodIP("192.168.1.100")
+        .endStatus()
+        .build();
+
+    // Create the pod in the mock client
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
+
+    // Should return the pod successfully
+    Pod result = instance.waitForPodResultWithRetries(
+        clientApi.getClient(), 
+        pod, 
+        1, 
+        TimeUnit.SECONDS, 
+        0, 
+        3
+    );
+
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(POD_NAME, result.getMetadata().getName());
+    Assertions.assertEquals("192.168.1.100", result.getStatus().getPodIP());
+  }
+
+  @Test
+  void 
test_waitForPodResultWithRetries_withNonRetryableFailure_throwsDruidException()
+  {
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .endMetadata()
+        .withNewStatus()
+        .withPodIP(null) // Pod without IP, will timeout
+        .endStatus()
+        .build();
+
+    String podPath = "/api/v1/namespaces/" + NAMESPACE + "/pods/" + POD_NAME;
+
+    // Mock server to return the pod without IP, causing timeout
+    server.expect().get()
+        .withPath(podPath + "?watch=true")
+        .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "Internal server 
error")
+        .once();
+
+    // Should throw DruidException after failure
+    DruidException e = Assertions.assertThrows(
+        DruidException.class,
+        () -> instance.waitForPodResultWithRetries(
+            clientApi.getClient(), 
+            pod, 
+            1, 
+            TimeUnit.MILLISECONDS, // Very short timeout to force failure
+            0, 
+            1
+        )
+    );
+
+    // Verify the error message contains our pod name
+    Assertions.assertTrue(e.getMessage().contains(POD_NAME));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to